remove queryservice global stats variables

1. remove global stats variables defined in query_engine.go.
2. introduce QueryServiceStats struct to be a holder of these stats.
3. use a dependency injection way to push QueryServiceStats instance from
   QueryEngine to ConnPool, DBConn, TxPool, etc.
This commit is contained in:
Shengzhe Yao 2015-04-28 21:13:27 -07:00
Родитель d62240abc8
Коммит f7b469540a
26 изменённых файлов: 433 добавлений и 319 удалений

Просмотреть файл

@ -24,17 +24,18 @@ import (
// CachePool re-exposes ResourcePool as a pool of Memcache connection objects.
type CachePool struct {
name string
pool *pools.ResourcePool
maxPrefix sync2.AtomicInt64
cmd *exec.Cmd
rowCacheConfig RowCacheConfig
capacity int
socket string
idleTimeout time.Duration
memcacheStats *MemcacheStats
mu sync.Mutex
statsURL string
name string
pool *pools.ResourcePool
maxPrefix sync2.AtomicInt64
cmd *exec.Cmd
rowCacheConfig RowCacheConfig
capacity int
socket string
idleTimeout time.Duration
memcacheStats *MemcacheStats
queryServiceStats *QueryServiceStats
mu sync.Mutex
statsURL string
}
// NewCachePool creates a new pool for rowcache connections.
@ -43,11 +44,18 @@ func NewCachePool(
rowCacheConfig RowCacheConfig,
idleTimeout time.Duration,
statsURL string,
enablePublishStats bool) *CachePool {
cp := &CachePool{name: name, idleTimeout: idleTimeout, statsURL: statsURL}
enablePublishStats bool,
queryServiceStats *QueryServiceStats) *CachePool {
cp := &CachePool{
name: name,
idleTimeout: idleTimeout,
statsURL: statsURL,
queryServiceStats: queryServiceStats,
}
if name != "" && enablePublishStats {
cp.memcacheStats = NewMemcacheStats(
rowCacheConfig.StatsPrefix+name, 10*time.Second, enableMain,
queryServiceStats,
func(key string) string {
conn := cp.Get(context.Background())
// This is not the same as defer cachePool.Put(conn)
@ -57,7 +65,7 @@ func NewCachePool(
conn.Close()
conn = nil
log.Errorf("Cannot export memcache %v stats: %v", key, err)
internalErrors.Add("MemcacheStats", 1)
queryServiceStats.InternalErrors.Add("MemcacheStats", 1)
return ""
}
return string(stats)

Просмотреть файл

@ -230,11 +230,11 @@ func TestCachePoolMemcacheStatsFail(t *testing.T) {
cachePool.idleTimeout = idleTimeout
cachePool.Open()
defer cachePool.Close()
memcacheStatsBefore := internalErrors.Counts()["MemcacheStats"]
memcacheStatsBefore := cachePool.queryServiceStats.InternalErrors.Counts()["MemcacheStats"]
// any memcache calls should fail
cache.EnableCacheServiceError()
cachePool.memcacheStats.update()
memcacheStatsAfter := internalErrors.Counts()["MemcacheStats"]
memcacheStatsAfter := cachePool.queryServiceStats.InternalErrors.Counts()["MemcacheStats"]
if memcacheStatsAfter <= memcacheStatsBefore {
t.Fatalf("memcache stats should cause an internal error")
}
@ -261,5 +261,12 @@ func newTestCachePool(rowcacheConfig RowCacheConfig, enablePublishStats bool) *C
randID := rand.Int63()
name := fmt.Sprintf("TestCachePool-%d-", randID)
statsURL := fmt.Sprintf("/debug/cache-%d/", randID)
return NewCachePool(name, rowcacheConfig, 1*time.Second, statsURL, enablePublishStats)
queryServiceStats := NewQueryServiceStats(name, enablePublishStats)
return NewCachePool(
name,
rowcacheConfig,
1*time.Second,
statsURL,
enablePublishStats,
queryServiceStats)
}

Просмотреть файл

@ -276,7 +276,7 @@ func applyFilterWithPKDefaults(tableInfo *TableInfo, columnNumbers []int, input
return output
}
func validateKey(tableInfo *TableInfo, key string) (newKey string) {
func validateKey(tableInfo *TableInfo, key string, qStats *QueryServiceStats) (newKey string) {
if key == "" {
// TODO: Verify auto-increment table
return
@ -292,7 +292,7 @@ func validateKey(tableInfo *TableInfo, key string) (newKey string) {
s, err := base64.StdEncoding.DecodeString(piece[1 : len(piece)-1])
if err != nil {
log.Warningf("Error decoding key %s for table %s: %v", key, tableInfo.Name, err)
internalErrors.Add("Mismatch", 1)
qStats.InternalErrors.Add("Mismatch", 1)
return
}
pkValues[i] = sqltypes.MakeString(s)
@ -303,7 +303,7 @@ func validateKey(tableInfo *TableInfo, key string) (newKey string) {
n, err := sqltypes.BuildNumeric(piece)
if err != nil {
log.Warningf("Error decoding key %s for table %s: %v", key, tableInfo.Name, err)
internalErrors.Add("Mismatch", 1)
qStats.InternalErrors.Add("Mismatch", 1)
return
}
pkValues[i] = n
@ -311,7 +311,7 @@ func validateKey(tableInfo *TableInfo, key string) (newKey string) {
}
if newKey = buildKey(pkValues); newKey != key {
log.Warningf("Error: Key mismatch, received: %s, computed: %s", key, newKey)
internalErrors.Add("Mismatch", 1)
qStats.InternalErrors.Add("Mismatch", 1)
}
return newKey
}

Просмотреть файл

@ -420,33 +420,34 @@ func TestCodexApplyFilterWithPKDefaults(t *testing.T) {
func TestCodexValidateKey(t *testing.T) {
testUtils := newTestUtils()
queryServiceStats := NewQueryServiceStats("", false)
tableInfo := createTableInfo("Table",
map[string]string{"pk1": "int", "pk2": "varbinary(128)", "col1": "int"},
[]string{"pk1", "pk2"})
// validate empty key
newKey := validateKey(&tableInfo, "")
newKey := validateKey(&tableInfo, "", queryServiceStats)
testUtils.checkEqual(t, "", newKey)
// validate keys that do not match number of pk columns
newKey = validateKey(&tableInfo, "1")
newKey = validateKey(&tableInfo, "1", queryServiceStats)
testUtils.checkEqual(t, "", newKey)
newKey = validateKey(&tableInfo, "1.2.3")
newKey = validateKey(&tableInfo, "1.2.3", queryServiceStats)
testUtils.checkEqual(t, "", newKey)
// validate keys with null
newKey = validateKey(&tableInfo, "'MQ=='.null")
newKey = validateKey(&tableInfo, "'MQ=='.null", queryServiceStats)
testUtils.checkEqual(t, "", newKey)
// validate keys with invalid base64 encoded string
newKey = validateKey(&tableInfo, "'MQ==<'.2")
newKey = validateKey(&tableInfo, "'MQ==<'.2", queryServiceStats)
testUtils.checkEqual(t, "", newKey)
// validate keys with invalid value
mismatchCounterBefore := internalErrors.Counts()["Mismatch"]
newKey = validateKey(&tableInfo, "not_a_number.2")
mismatchCounterAfter := internalErrors.Counts()["Mismatch"]
mismatchCounterBefore := queryServiceStats.InternalErrors.Counts()["Mismatch"]
newKey = validateKey(&tableInfo, "not_a_number.2", queryServiceStats)
mismatchCounterAfter := queryServiceStats.InternalErrors.Counts()["Mismatch"]
if mismatchCounterAfter-mismatchCounterBefore != 1 {
t.Fatalf("Mismatch counter should increase by one. Mismatch counter before: %d, after: %d, diff: %d", mismatchCounterBefore, mismatchCounterAfter, mismatchCounterAfter-mismatchCounterBefore)
}
testUtils.checkEqual(t, "", newKey)
// validate valid keys
newKey = validateKey(&tableInfo, "1.2")
newKey = validateKey(&tableInfo, "1.2", queryServiceStats)
testUtils.checkEqual(t, "1.2", newKey)
}

Просмотреть файл

@ -22,11 +22,12 @@ import (
// Other than the connection type, ConnPool maintains an additional
// pool of dba connections that are used to kill connections.
type ConnPool struct {
mu sync.Mutex
connections *pools.ResourcePool
capacity int
idleTimeout time.Duration
dbaPool *dbconnpool.ConnectionPool
mu sync.Mutex
connections *pools.ResourcePool
capacity int
idleTimeout time.Duration
dbaPool *dbconnpool.ConnectionPool
queryServiceStats *QueryServiceStats
}
// NewConnPool creates a new ConnPool. The name is used
@ -35,11 +36,13 @@ func NewConnPool(
name string,
capacity int,
idleTimeout time.Duration,
enablePublishStats bool) *ConnPool {
enablePublishStats bool,
queryServiceStats *QueryServiceStats) *ConnPool {
cp := &ConnPool{
capacity: capacity,
idleTimeout: idleTimeout,
dbaPool: dbconnpool.NewConnectionPool("", 1, idleTimeout),
capacity: capacity,
idleTimeout: idleTimeout,
dbaPool: dbconnpool.NewConnectionPool("", 1, idleTimeout),
queryServiceStats: queryServiceStats,
}
if name == "" {
return cp
@ -68,10 +71,10 @@ func (cp *ConnPool) Open(appParams, dbaParams *sqldb.ConnParams) {
defer cp.mu.Unlock()
f := func() (pools.Resource, error) {
return NewDBConn(cp, appParams, dbaParams)
return NewDBConn(cp, appParams, dbaParams, cp.queryServiceStats)
}
cp.connections = pools.NewResourcePool(f, cp.capacity, cp.capacity, cp.idleTimeout)
cp.dbaPool.Open(dbconnpool.DBConnectionCreator(dbaParams, mysqlStats))
cp.dbaPool.Open(dbconnpool.DBConnectionCreator(dbaParams, cp.queryServiceStats.MySQLStats))
}
// Close will close the pool and wait for connections to be returned before

Просмотреть файл

@ -15,7 +15,8 @@ import (
func TestConnPoolTryGetWhilePoolIsClosed(t *testing.T) {
fakesqldb.Register()
connPool := NewConnPool("ConnPool", 100, 10*time.Second, false)
testUtils := newTestUtils()
connPool := testUtils.newConnPool()
_, err := connPool.TryGet()
if err != ErrConnPoolClosed {
t.Fatalf("pool is closed, should get ErrConnPoolClosed")
@ -24,10 +25,11 @@ func TestConnPoolTryGetWhilePoolIsClosed(t *testing.T) {
func TestConnPoolTryGetWhenFailedToConnectToDB(t *testing.T) {
db := fakesqldb.Register()
testUtils := newTestUtils()
db.EnableConnFail()
appParams := &sqldb.ConnParams{}
dbaParams := &sqldb.ConnParams{}
connPool := NewConnPool("ConnPool", 100, 10*time.Second, false)
connPool := testUtils.newConnPool()
connPool.Open(appParams, dbaParams)
defer connPool.Close()
_, err := connPool.TryGet()
@ -38,9 +40,10 @@ func TestConnPoolTryGetWhenFailedToConnectToDB(t *testing.T) {
func TestConnPoolTryGet(t *testing.T) {
fakesqldb.Register()
testUtils := newTestUtils()
appParams := &sqldb.ConnParams{}
dbaParams := &sqldb.ConnParams{}
connPool := NewConnPool("ConnPool", 100, 10*time.Second, false)
connPool := testUtils.newConnPool()
connPool.Open(appParams, dbaParams)
defer connPool.Close()
dbConn, err := connPool.TryGet()
@ -55,9 +58,10 @@ func TestConnPoolTryGet(t *testing.T) {
func TestConnPoolGet(t *testing.T) {
fakesqldb.Register()
testUtils := newTestUtils()
appParams := &sqldb.ConnParams{}
dbaParams := &sqldb.ConnParams{}
connPool := NewConnPool("ConnPool", 100, 10*time.Second, false)
connPool := testUtils.newConnPool()
connPool.Open(appParams, dbaParams)
defer connPool.Close()
dbConn, err := connPool.Get(context.Background())
@ -72,7 +76,8 @@ func TestConnPoolGet(t *testing.T) {
func TestConnPoolPutWhilePoolIsClosed(t *testing.T) {
fakesqldb.Register()
connPool := NewConnPool("ConnPool", 100, 10*time.Second, false)
testUtils := newTestUtils()
connPool := testUtils.newConnPool()
defer func() {
if recover() == nil {
t.Fatalf("pool is closed, should get an error")
@ -83,9 +88,10 @@ func TestConnPoolPutWhilePoolIsClosed(t *testing.T) {
func TestConnPoolSetCapacity(t *testing.T) {
fakesqldb.Register()
testUtils := newTestUtils()
appParams := &sqldb.ConnParams{}
dbaParams := &sqldb.ConnParams{}
connPool := NewConnPool("ConnPool", 100, 10*time.Second, false)
connPool := testUtils.newConnPool()
connPool.Open(appParams, dbaParams)
defer connPool.Close()
err := connPool.SetCapacity(-10)
@ -103,7 +109,8 @@ func TestConnPoolSetCapacity(t *testing.T) {
func TestConnPoolStatJSON(t *testing.T) {
fakesqldb.Register()
connPool := NewConnPool("ConnPool", 100, 10*time.Second, false)
testUtils := newTestUtils()
connPool := testUtils.newConnPool()
if connPool.StatsJSON() != "{}" {
t.Fatalf("pool is closed, stats json should be empty: {}")
}
@ -119,7 +126,8 @@ func TestConnPoolStatJSON(t *testing.T) {
func TestConnPoolStateWhilePoolIsClosed(t *testing.T) {
fakesqldb.Register()
connPool := NewConnPool("ConnPool", 100, 10*time.Second, false)
testUtils := newTestUtils()
connPool := testUtils.newConnPool()
if connPool.Capacity() != 0 {
t.Fatalf("pool capacity should be 0 because it is still closed")
}
@ -142,10 +150,11 @@ func TestConnPoolStateWhilePoolIsClosed(t *testing.T) {
func TestConnPoolStateWhilePoolIsOpen(t *testing.T) {
fakesqldb.Register()
testUtils := newTestUtils()
appParams := &sqldb.ConnParams{}
dbaParams := &sqldb.ConnParams{}
idleTimeout := 10 * time.Second
connPool := NewConnPool("ConnPool", 100, idleTimeout, false)
connPool := testUtils.newConnPool()
connPool.Open(appParams, dbaParams)
defer connPool.Close()
if connPool.Capacity() != 100 {

Просмотреть файл

@ -22,24 +22,29 @@ import (
// its own queries and the underlying connection.
// It will also trigger a CheckMySQL whenever applicable.
type DBConn struct {
conn *dbconnpool.DBConnection
info *sqldb.ConnParams
pool *ConnPool
current sync2.AtomicString
conn *dbconnpool.DBConnection
info *sqldb.ConnParams
pool *ConnPool
queryServiceStats *QueryServiceStats
current sync2.AtomicString
}
// NewDBConn creates a new DBConn. It triggers a CheckMySQL if creation fails.
func NewDBConn(cp *ConnPool, appParams, dbaParams *sqldb.ConnParams) (*DBConn, error) {
c, err := dbconnpool.NewDBConnection(appParams, mysqlStats)
func NewDBConn(
cp *ConnPool,
appParams,
dbaParams *sqldb.ConnParams,
qStats *QueryServiceStats) (*DBConn, error) {
c, err := dbconnpool.NewDBConnection(appParams, qStats.MySQLStats)
if err != nil {
go checkMySQL()
return nil, err
}
return &DBConn{
conn: c,
info: appParams,
pool: cp,
conn: c,
info: appParams,
pool: cp,
queryServiceStats: qStats,
}, nil
}
@ -123,7 +128,7 @@ func (dbc *DBConn) Recycle() {
// and on the connection side. If no query is executing, it's a no-op.
// Kill will also not kill a query more than once.
func (dbc *DBConn) Kill() error {
killStats.Add("Queries", 1)
dbc.queryServiceStats.KillStats.Add("Queries", 1)
log.Infof("killing query %s", dbc.Current())
killConn, err := dbc.pool.dbaPool.Get(0)
if err != nil {
@ -152,7 +157,7 @@ func (dbc *DBConn) ID() int64 {
func (dbc *DBConn) reconnect() error {
dbc.conn.Close()
newConn, err := dbconnpool.NewDBConnection(dbc.info, mysqlStats)
newConn, err := dbconnpool.NewDBConnection(dbc.info, dbc.queryServiceStats.MySQLStats)
if err != nil {
return err
}
@ -185,7 +190,7 @@ func (dbc *DBConn) setDeadline(ctx context.Context) chan bool {
defer tmr2.Stop()
select {
case <-tmr2.C:
internalErrors.Add("HungQuery", 1)
dbc.queryServiceStats.InternalErrors.Add("HungQuery", 1)
log.Warningf("Query may be hung: %s", dbc.Current())
case <-done:
return

Просмотреть файл

@ -27,14 +27,15 @@ func TestDBConnExec(t *testing.T) {
},
}
db.AddQuery(sql, expectedResult)
connPool := NewConnPool("ConnPool", 100, 10*time.Second, false)
connPool := testUtils.newConnPool()
appParams := &sqldb.ConnParams{}
dbaParams := &sqldb.ConnParams{}
connPool.Open(appParams, dbaParams)
defer connPool.Close()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
defer cancel()
dbConn, err := NewDBConn(connPool, appParams, dbaParams)
queryServiceStats := NewQueryServiceStats("", false)
dbConn, err := NewDBConn(connPool, appParams, dbaParams, queryServiceStats)
defer dbConn.Close()
if err != nil {
t.Fatalf("should not get an error, err: %v", err)
@ -55,18 +56,19 @@ func TestDBConnExec(t *testing.T) {
func TestDBConnKill(t *testing.T) {
db := fakesqldb.Register()
testUtils := newTestUtils()
connPool := NewConnPool("ConnPool", 100, 10*time.Second, false)
connPool := testUtils.newConnPool()
appParams := &sqldb.ConnParams{}
dbaParams := &sqldb.ConnParams{}
connPool.Open(appParams, dbaParams)
defer connPool.Close()
dbConn, _ := NewDBConn(connPool, appParams, dbaParams)
queryServiceStats := NewQueryServiceStats("", false)
dbConn, err := NewDBConn(connPool, appParams, dbaParams, queryServiceStats)
defer dbConn.Close()
query := fmt.Sprintf("kill %d", dbConn.ID())
db.AddQuery(query, &mproto.QueryResult{})
// Kill failed because we are not able to connect to the database
db.EnableConnFail()
err := dbConn.Kill()
err = dbConn.Kill()
testUtils.checkTabletError(t, err, ErrFail, "Failed to get conn from dba pool")
db.DisableConnFail()
@ -99,17 +101,18 @@ func TestDBConnStream(t *testing.T) {
},
}
db.AddQuery(sql, expectedResult)
connPool := NewConnPool("ConnPool", 100, 10*time.Second, false)
connPool := testUtils.newConnPool()
appParams := &sqldb.ConnParams{}
dbaParams := &sqldb.ConnParams{}
connPool.Open(appParams, dbaParams)
defer connPool.Close()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
defer cancel()
dbConn, _ := NewDBConn(connPool, appParams, dbaParams)
queryServiceStats := NewQueryServiceStats("", false)
dbConn, err := NewDBConn(connPool, appParams, dbaParams, queryServiceStats)
defer dbConn.Close()
var result mproto.QueryResult
err := dbConn.Stream(
err = dbConn.Stream(
ctx, sql, func(r *mproto.QueryResult) error {
result = *r
return nil

Просмотреть файл

@ -107,14 +107,15 @@ type RetrieveCacheStats func(key string) string
// MemcacheStats exports the Memcache internal stats through stats package.
type MemcacheStats struct {
ticks *timer.Timer
mu sync.Mutex
main map[string]string
slabs map[string]map[string]int64
items map[string]map[string]int64
statsPrefix string
statsFunc RetrieveCacheStats
flags int64
ticks *timer.Timer
mu sync.Mutex
main map[string]string
slabs map[string]map[string]int64
items map[string]map[string]int64
statsPrefix string
statsFunc RetrieveCacheStats
queryServiceStats *QueryServiceStats
flags int64
}
const (
@ -129,15 +130,17 @@ func NewMemcacheStats(
statsPrefix string,
refreshFreq time.Duration,
flags int64,
queryServiceStats *QueryServiceStats,
statsFunc RetrieveCacheStats) *MemcacheStats {
memstats := &MemcacheStats{
ticks: timer.NewTimer(refreshFreq),
statsPrefix: statsPrefix,
statsFunc: statsFunc,
main: make(map[string]string),
slabs: make(map[string]map[string]int64),
items: make(map[string]map[string]int64),
flags: flags,
ticks: timer.NewTimer(refreshFreq),
statsPrefix: statsPrefix,
statsFunc: statsFunc,
main: make(map[string]string),
slabs: make(map[string]map[string]int64),
items: make(map[string]map[string]int64),
queryServiceStats: queryServiceStats,
flags: flags,
}
if flags&enableMain > 0 {
memstats.publishMainStats()
@ -209,7 +212,7 @@ func (memstats *MemcacheStats) publishMainStats() {
ival, err := strconv.ParseInt(memstats.main[key], 10, 64)
if err != nil {
log.Errorf("value '%v' for key %v is not an int", memstats.main[key], key)
internalErrors.Add("MemcacheStats", 1)
memstats.queryServiceStats.InternalErrors.Add("MemcacheStats", 1)
return -1
}
return ival
@ -251,14 +254,14 @@ func (memstats *MemcacheStats) updateSlabsStats() {
ival, err := strconv.ParseInt(sValue, 10, 64)
if err != nil {
log.Error(err)
internalErrors.Add("MemcacheStats", 1)
memstats.queryServiceStats.InternalErrors.Add("MemcacheStats", 1)
return
}
if slabsSingleMetrics[sKey] {
m, ok := memstats.slabs[sKey]
if !ok {
log.Errorf("Unknown memcache slabs stats %v: %v", sKey, ival)
internalErrors.Add("MemcacheStats", 1)
memstats.queryServiceStats.InternalErrors.Add("MemcacheStats", 1)
return
}
m[""] = ival
@ -267,13 +270,13 @@ func (memstats *MemcacheStats) updateSlabsStats() {
subkey, slabid, err := parseSlabKey(sKey)
if err != nil {
log.Error(err)
internalErrors.Add("MemcacheStats", 1)
memstats.queryServiceStats.InternalErrors.Add("MemcacheStats", 1)
return
}
m, ok := memstats.slabs[subkey]
if !ok {
log.Errorf("Unknown memcache slabs stats %v %v: %v", subkey, slabid, ival)
internalErrors.Add("MemcacheStats", 1)
memstats.queryServiceStats.InternalErrors.Add("MemcacheStats", 1)
return
}
m[slabid] = ival
@ -299,19 +302,19 @@ func (memstats *MemcacheStats) updateItemsStats() {
ival, err := strconv.ParseInt(sValue, 10, 64)
if err != nil {
log.Error(err)
internalErrors.Add("MemcacheStats", 1)
memstats.queryServiceStats.InternalErrors.Add("MemcacheStats", 1)
return
}
subkey, slabid, err := parseItemKey(sKey)
if err != nil {
log.Error(err)
internalErrors.Add("MemcacheStats", 1)
memstats.queryServiceStats.InternalErrors.Add("MemcacheStats", 1)
return
}
m, ok := memstats.items[subkey]
if !ok {
log.Errorf("Unknown memcache items stats %v %v: %v", subkey, slabid, ival)
internalErrors.Add("MemcacheStats", 1)
memstats.queryServiceStats.InternalErrors.Add("MemcacheStats", 1)
return
}
m[slabid] = ival
@ -327,7 +330,7 @@ func (memstats *MemcacheStats) readStats(k string, proc func(key, value string))
} else {
log.Errorf("Could not read memcache stats: %v", x)
}
internalErrors.Add("MemcacheStats", 1)
memstats.queryServiceStats.InternalErrors.Add("MemcacheStats", 1)
}
}()
@ -348,7 +351,7 @@ func (memstats *MemcacheStats) readStats(k string, proc func(key, value string))
//so less then 3 would be compatible with original memcached
if len(items) < 3 {
log.Errorf("Unexpected stats: %v", line)
internalErrors.Add("MemcacheStats", 1)
memstats.queryServiceStats.InternalErrors.Add("MemcacheStats", 1)
continue
}
proc(items[1], items[2])

Просмотреть файл

@ -15,7 +15,7 @@ import (
func TestMemcacheStats(t *testing.T) {
statsPrefix := newStatsPrefix()
memcacheStats := NewMemcacheStats(
statsPrefix, 1*time.Second, enableMain,
statsPrefix, 1*time.Second, enableMain, NewQueryServiceStats("", false),
func(key string) string {
switch key {
case "slabs":
@ -35,7 +35,7 @@ func TestMemcacheStats(t *testing.T) {
func TestMemcacheStatsInvalidMainStatsValueType(t *testing.T) {
statsPrefix := newStatsPrefix()
memcacheStats := NewMemcacheStats(
statsPrefix, 1*time.Second, enableMain,
statsPrefix, 1*time.Second, enableMain, NewQueryServiceStats("", false),
func(key string) string {
switch key {
case "slabs":
@ -57,7 +57,7 @@ func TestMemcacheStatsInvalidMainStatsValueType(t *testing.T) {
func TestMemcacheStatsSlabsStats(t *testing.T) {
statsPrefix := newStatsPrefix()
memcacheStats := NewMemcacheStats(
statsPrefix, 1*time.Second, enableSlabs,
statsPrefix, 1*time.Second, enableSlabs, NewQueryServiceStats("", false),
func(key string) string {
switch key {
case "slabs":
@ -95,7 +95,7 @@ func TestMemcacheStatsSlabsStats(t *testing.T) {
func TestMemcacheStatsItemsStats(t *testing.T) {
statsPrefix := newStatsPrefix()
memcacheStats := NewMemcacheStats(
statsPrefix, 1*time.Second, enableItems,
statsPrefix, 1*time.Second, enableItems, NewQueryServiceStats("", false),
func(key string) string {
switch key {
case "slabs":
@ -134,17 +134,18 @@ func TestMemcacheStatsItemsStats(t *testing.T) {
func TestMemcacheStatsPanic(t *testing.T) {
statsPrefix := newStatsPrefix()
queryServiceStats := NewQueryServiceStats("", false)
memcacheStats := NewMemcacheStats(
statsPrefix, 100*time.Second, enableMain,
statsPrefix, 100*time.Second, enableMain, queryServiceStats,
func(key string) string {
panic("unknown error")
},
)
errCountBefore := internalErrors.Counts()["MemcacheStats"]
errCountBefore := queryServiceStats.InternalErrors.Counts()["MemcacheStats"]
memcacheStats.Open()
defer memcacheStats.Close()
memcacheStats.update()
errCountAfter := internalErrors.Counts()["MemcacheStats"]
errCountAfter := queryServiceStats.InternalErrors.Counts()["MemcacheStats"]
if errCountAfter-errCountBefore != 1 {
t.Fatalf("got unknown panic, MemcacheStats counter should increase by 1")
}
@ -152,17 +153,18 @@ func TestMemcacheStatsPanic(t *testing.T) {
func TestMemcacheStatsTabletError(t *testing.T) {
statsPrefix := newStatsPrefix()
queryServiceStats := NewQueryServiceStats("", false)
memcacheStats := NewMemcacheStats(
statsPrefix, 100*time.Second, enableMain,
statsPrefix, 100*time.Second, enableMain, queryServiceStats,
func(key string) string {
panic(NewTabletError(ErrFail, "unknown tablet error"))
},
)
errCountBefore := internalErrors.Counts()["MemcacheStats"]
errCountBefore := queryServiceStats.InternalErrors.Counts()["MemcacheStats"]
memcacheStats.Open()
defer memcacheStats.Close()
memcacheStats.update()
errCountAfter := internalErrors.Counts()["MemcacheStats"]
errCountAfter := queryServiceStats.InternalErrors.Counts()["MemcacheStats"]
if errCountAfter-errCountBefore != 1 {
t.Fatalf("got tablet error, MemcacheStats counter should increase by 1")
}

Просмотреть файл

@ -64,8 +64,11 @@ type QueryEngine struct {
streamBufferSize sync2.AtomicInt64
strictTableAcl bool
// loggers
// Loggers
accessCheckerLogger *logutil.ThrottledLogger
// Stats
queryServiceStats *QueryServiceStats
}
type compiledPlan struct {
@ -75,22 +78,6 @@ type compiledPlan struct {
TransactionID int64
}
var (
// stats are globals to allow anybody to set them
mysqlStats *stats.Timings
queryStats *stats.Timings
waitStats *stats.Timings
killStats *stats.Counters
infoErrors *stats.Counters
errorStats *stats.Counters
internalErrors *stats.Counters
resultStats *stats.Histogram
spotCheckCount *stats.Int
qpsRates *stats.Rates
resultBuckets = []int64{0, 1, 5, 10, 50, 100, 500, 1000, 5000, 10000}
)
// CacheInvalidator provides the abstraction needed for an instant invalidation
// vs. delayed invalidation in the case of in-transaction dmls
type CacheInvalidator interface {
@ -114,6 +101,7 @@ func getOrPanic(ctx context.Context, pool *ConnPool) *DBConn {
// You must call this only once.
func NewQueryEngine(config Config) *QueryEngine {
qe := &QueryEngine{}
qe.queryServiceStats = NewQueryServiceStats(config.StatsPrefix, config.EnablePublishStats)
qe.schemaInfo = NewSchemaInfo(
config.QueryCacheSize,
config.StatsPrefix,
@ -126,6 +114,7 @@ func NewQueryEngine(config Config) *QueryEngine {
time.Duration(config.SchemaReloadTime*1e9),
time.Duration(config.IdleTimeout*1e9),
config.EnablePublishStats,
qe.queryServiceStats,
)
// Pools
@ -135,18 +124,21 @@ func NewQueryEngine(config Config) *QueryEngine {
time.Duration(config.IdleTimeout*1e9),
config.DebugURLPrefix+"/memcache/",
config.EnablePublishStats,
qe.queryServiceStats,
)
qe.connPool = NewConnPool(
config.PoolNamePrefix+"ConnPool",
config.PoolSize,
time.Duration(config.IdleTimeout*1e9),
config.EnablePublishStats,
qe.queryServiceStats,
)
qe.streamConnPool = NewConnPool(
config.PoolNamePrefix+"StreamConnPool",
config.StreamPoolSize,
time.Duration(config.IdleTimeout*1e9),
config.EnablePublishStats,
qe.queryServiceStats,
)
// Services
@ -158,6 +150,7 @@ func NewQueryEngine(config Config) *QueryEngine {
time.Duration(config.TxPoolTimeout*1e9),
time.Duration(config.IdleTimeout*1e9),
config.EnablePublishStats,
qe.queryServiceStats,
)
qe.consolidator = sync2.NewConsolidator()
http.Handle(config.DebugURLPrefix+"/consolidations", qe.consolidator)
@ -175,21 +168,10 @@ func NewQueryEngine(config Config) *QueryEngine {
qe.maxDMLRows = sync2.AtomicInt64(config.MaxDMLRows)
qe.streamBufferSize = sync2.AtomicInt64(config.StreamBufferSize)
// loggers
// Loggers
qe.accessCheckerLogger = logutil.NewThrottledLogger("accessChecker", 1*time.Second)
// Stats
mysqlStatsName := ""
queryStatsName := ""
qpsRateName := ""
waitStatsName := ""
killStatsName := ""
infoErrorsName := ""
errorStatsName := ""
internalErrorsName := ""
resultStatsName := ""
spotCheckCountName := ""
if config.EnablePublishStats {
stats.Publish(config.StatsPrefix+"MaxResultSize", stats.IntFunc(qe.maxResultSize.Get))
stats.Publish(config.StatsPrefix+"MaxDMLRows", stats.IntFunc(qe.maxDMLRows.Get))
@ -198,29 +180,7 @@ func NewQueryEngine(config Config) *QueryEngine {
stats.Publish(config.StatsPrefix+"RowcacheSpotCheckRatio", stats.FloatFunc(func() float64 {
return float64(qe.spotCheckFreq.Get()) / spotCheckMultiplier
}))
mysqlStatsName = config.StatsPrefix + "Mysql"
queryStatsName = config.StatsPrefix + "Queries"
qpsRateName = config.StatsPrefix + "QPS"
waitStatsName = config.StatsPrefix + "Waits"
killStatsName = config.StatsPrefix + "Kills"
infoErrorsName = config.StatsPrefix + "InfoErrors"
errorStatsName = config.StatsPrefix + "Errors"
internalErrorsName = config.StatsPrefix + "InternalErrors"
resultStatsName = config.StatsPrefix + "Results"
spotCheckCountName = config.StatsPrefix + "RowcacheSpotCheckCount"
}
mysqlStats = stats.NewTimings(mysqlStatsName)
queryStats = stats.NewTimings(queryStatsName)
qpsRates = stats.NewRates(qpsRateName, queryStats, 15, 60*time.Second)
waitStats = stats.NewTimings(waitStatsName)
killStats = stats.NewCounters(killStatsName)
infoErrors = stats.NewCounters(infoErrorsName)
errorStats = stats.NewCounters(errorStatsName)
internalErrors = stats.NewCounters(internalErrorsName)
resultStats = stats.NewHistogram(resultStatsName, resultBuckets)
spotCheckCount = stats.NewInt(spotCheckCountName)
return qe
}
@ -281,7 +241,7 @@ func (qe *QueryEngine) Launch(f func()) {
defer func() {
qe.tasks.Done()
if x := recover(); x != nil {
internalErrors.Add("Task", 1)
qe.queryServiceStats.InternalErrors.Add("Task", 1)
log.Errorf("task error: %v", x)
}
}()
@ -291,7 +251,7 @@ func (qe *QueryEngine) Launch(f func()) {
// CheckMySQL returns true if we can connect to MySQL.
func (qe *QueryEngine) CheckMySQL() bool {
conn, err := dbconnpool.NewDBConnection(&qe.dbconfigs.App.ConnParams, mysqlStats)
conn, err := dbconnpool.NewDBConnection(&qe.dbconfigs.App.ConnParams, qe.queryServiceStats.MySQLStats)
if err != nil {
if IsConnErr(err) {
return false

Просмотреть файл

@ -44,7 +44,7 @@ func (qre *QueryExecutor) Execute() (reply *mproto.QueryResult) {
qre.logStats.PlanType = planName
defer func(start time.Time) {
duration := time.Now().Sub(start)
queryStats.Add(planName, duration)
qre.qe.queryServiceStats.QueryStats.Add(planName, duration)
if reply == nil {
qre.plan.AddStats(1, duration, 0, 1)
return
@ -52,7 +52,7 @@ func (qre *QueryExecutor) Execute() (reply *mproto.QueryResult) {
qre.plan.AddStats(1, duration, int64(reply.RowsAffected), 0)
qre.logStats.RowsAffected = int(reply.RowsAffected)
qre.logStats.Rows = reply.Rows
resultStats.Add(int64(len(reply.Rows)))
qre.qe.queryServiceStats.ResultStats.Add(int64(len(reply.Rows)))
}(time.Now())
qre.checkPermissions()
@ -117,7 +117,7 @@ func (qre *QueryExecutor) Execute() (reply *mproto.QueryResult) {
func (qre *QueryExecutor) Stream(sendReply func(*mproto.QueryResult) error) {
qre.logStats.OriginalSql = qre.query
qre.logStats.PlanType = qre.plan.PlanId.String()
defer queryStats.Record(qre.plan.PlanId.String(), time.Now())
defer qre.qe.queryServiceStats.QueryStats.Record(qre.plan.PlanId.String(), time.Now())
qre.checkPermissions()
@ -272,7 +272,7 @@ func (qre *QueryExecutor) mustVerify() bool {
}
func (qre *QueryExecutor) spotCheck(rcresult RCResult, pk []sqltypes.Value) {
spotCheckCount.Add(1)
qre.qe.queryServiceStats.SpotCheckCount.Add(1)
bv := map[string]interface{}{
"#pk": sqlparser.TupleEqualityList{
Columns: qre.plan.TableInfo.Indexes[0].Columns,
@ -300,7 +300,7 @@ func (qre *QueryExecutor) recheckLater(rcresult RCResult, dbrow []sqltypes.Value
}
log.Warningf("query: %v", qre.plan.FullQuery)
log.Warningf("mismatch for: %v\ncache: %v\ndb: %v", pk, rcresult.Row, dbrow)
internalErrors.Add("Mismatch", 1)
qre.qe.queryServiceStats.InternalErrors.Add("Mismatch", 1)
}
// execDirect always sends the query to mysql
@ -541,7 +541,7 @@ func (qre *QueryExecutor) qFetch(logStats *SQLQueryStats, parsedQuery *sqlparser
logStats.QuerySources |= QuerySourceConsolidator
startTime := time.Now()
q.Wait()
waitStats.Record("Consolidations", startTime)
qre.qe.queryServiceStats.WaitStats.Record("Consolidations", startTime)
}
if q.Err != nil {
panic(q.Err)

Просмотреть файл

@ -0,0 +1,75 @@
// Copyright 2015, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tabletserver
import (
"time"
"github.com/youtube/vitess/go/stats"
)
// QueryServiceStats contains stats that used in queryservice level.
type QueryServiceStats struct {
// MySQLStats shows the time histogram for operations spent on mysql side.
MySQLStats *stats.Timings
// QueryStats shows the time histogram for each type of queries.
QueryStats *stats.Timings
// WaitStats shows the time histogram for wait operations
WaitStats *stats.Timings
// KillStats shows number of connections being killed.
KillStats *stats.Counters
// InfoErrors shows number of various non critical errors happened.
InfoErrors *stats.Counters
// ErrorStats shows number of critial erros happened.
ErrorStats *stats.Counters
// InternalErros shows number of errors from internal components.
InternalErrors *stats.Counters
// QPSRates shows the qps.
QPSRates *stats.Rates
// ResultStats shows the histogram of number of rows returned.
ResultStats *stats.Histogram
// SpotCheckCount shows the number of spot check events happened.
SpotCheckCount *stats.Int
}
// NewQueryServiceStats returns a new QueryServiceStats instance.
func NewQueryServiceStats(statsPrefix string, enablePublishStats bool) *QueryServiceStats {
mysqlStatsName := ""
queryStatsName := ""
qpsRateName := ""
waitStatsName := ""
killStatsName := ""
infoErrorsName := ""
errorStatsName := ""
internalErrorsName := ""
resultStatsName := ""
spotCheckCountName := ""
if enablePublishStats {
mysqlStatsName = statsPrefix + "Mysql"
queryStatsName = statsPrefix + "Queries"
qpsRateName = statsPrefix + "QPS"
waitStatsName = statsPrefix + "Waits"
killStatsName = statsPrefix + "Kills"
infoErrorsName = statsPrefix + "InfoErrors"
errorStatsName = statsPrefix + "Errors"
internalErrorsName = statsPrefix + "InternalErrors"
resultStatsName = statsPrefix + "Results"
spotCheckCountName = statsPrefix + "RowcacheSpotCheckCount"
}
resultBuckets := []int64{0, 1, 5, 10, 50, 100, 500, 1000, 5000, 10000}
queryStats := stats.NewTimings(queryStatsName)
return &QueryServiceStats{
MySQLStats: stats.NewTimings(mysqlStatsName),
QueryStats: queryStats,
WaitStats: stats.NewTimings(waitStatsName),
KillStats: stats.NewCounters(killStatsName),
InfoErrors: stats.NewCounters(infoErrorsName),
ErrorStats: stats.NewCounters(errorStatsName),
InternalErrors: stats.NewCounters(internalErrorsName),
QPSRates: stats.NewRates(qpsRateName, queryStats, 15, 60*time.Second),
ResultStats: stats.NewHistogram(resultStatsName, resultBuckets),
SpotCheckCount: stats.NewInt(spotCheckCountName),
}
}

Просмотреть файл

@ -301,7 +301,7 @@ func (rqsc *realQueryServiceControl) AllowQueries(dbconfigs *dbconfigs.DBConfigs
// it has to wait for queries & transactions to be completed or killed,
// and also for house keeping goroutines to be terminated.
func (rqsc *realQueryServiceControl) DisallowQueries() {
defer logError()
defer logError(rqsc.sqlQueryRPCService.qe.queryServiceStats)
rqsc.sqlQueryRPCService.disallowQueries()
}
@ -312,7 +312,7 @@ func (rqsc *realQueryServiceControl) IsServing() bool {
// Reload the schema. If the query service is not running, nothing will happen
func (rqsc *realQueryServiceControl) ReloadSchema() {
defer logError()
defer logError(rqsc.sqlQueryRPCService.qe.queryServiceStats)
rqsc.sqlQueryRPCService.qe.schemaInfo.triggerReload()
}
@ -332,7 +332,7 @@ func (rqsc *realQueryServiceControl) registerCheckMySQL() {
time.Sleep(1 * time.Second)
checkMySLQThrottler.Release()
}()
defer logError()
defer logError(rqsc.sqlQueryRPCService.qe.queryServiceStats)
if rqsc.sqlQueryRPCService.checkMySQL() {
return
}

Просмотреть файл

@ -126,28 +126,28 @@ func (rci *RowcacheInvalidator) run(ctx *sync2.ServiceContext) error {
go checkMySQL()
}
log.Errorf("binlog.ServeUpdateStream returned err '%v', retrying in 1 second.", err.Error())
internalErrors.Add("Invalidation", 1)
rci.qe.queryServiceStats.InternalErrors.Add("Invalidation", 1)
time.Sleep(1 * time.Second)
}
log.Infof("Rowcache invalidator stopped")
return nil
}
func handleInvalidationError(event *blproto.StreamEvent) {
func (rci *RowcacheInvalidator) handleInvalidationError(event *blproto.StreamEvent) {
if x := recover(); x != nil {
terr, ok := x.(*TabletError)
if !ok {
log.Errorf("Uncaught panic for %+v:\n%v\n%s", event, x, tb.Stack(4))
internalErrors.Add("Panic", 1)
rci.qe.queryServiceStats.InternalErrors.Add("Panic", 1)
return
}
log.Errorf("%v: %+v", terr, event)
internalErrors.Add("Invalidation", 1)
rci.qe.queryServiceStats.InternalErrors.Add("Invalidation", 1)
}
}
func (rci *RowcacheInvalidator) processEvent(event *blproto.StreamEvent) error {
defer handleInvalidationError(event)
defer rci.handleInvalidationError(event)
switch event.Category {
case "DDL":
log.Infof("DDL invalidation: %s", event.Sql)
@ -160,7 +160,7 @@ func (rci *RowcacheInvalidator) processEvent(event *blproto.StreamEvent) error {
rci.AppendGTID(event.GTIDField.Value)
default:
log.Errorf("unknown event: %#v", event)
internalErrors.Add("Invalidation", 1)
rci.qe.queryServiceStats.InternalErrors.Add("Invalidation", 1)
return nil
}
rci.lagSeconds.Set(time.Now().Unix() - event.Timestamp)
@ -184,12 +184,12 @@ func (rci *RowcacheInvalidator) handleDMLEvent(event *blproto.StreamEvent) {
key, err := sqltypes.BuildValue(pkVal)
if err != nil {
log.Errorf("Error building invalidation key for %#v: '%v'", event, err)
internalErrors.Add("Invalidation", 1)
rci.qe.queryServiceStats.InternalErrors.Add("Invalidation", 1)
return
}
sqlTypeKeys = append(sqlTypeKeys, key)
}
newKey := validateKey(tableInfo, buildKey(sqlTypeKeys))
newKey := validateKey(tableInfo, buildKey(sqlTypeKeys), rci.qe.queryServiceStats)
if newKey == "" {
continue
}
@ -217,7 +217,7 @@ func (rci *RowcacheInvalidator) handleUnrecognizedEvent(sql string) {
statement, err := sqlparser.Parse(sql)
if err != nil {
log.Errorf("Error: %v: %s", err, sql)
internalErrors.Add("Invalidation", 1)
rci.qe.queryServiceStats.InternalErrors.Add("Invalidation", 1)
return
}
var table *sqlparser.TableName
@ -231,7 +231,7 @@ func (rci *RowcacheInvalidator) handleUnrecognizedEvent(sql string) {
table = stmt.Table
default:
log.Errorf("Unrecognized: %s", sql)
internalErrors.Add("Invalidation", 1)
rci.qe.queryServiceStats.InternalErrors.Add("Invalidation", 1)
return
}
@ -245,7 +245,7 @@ func (rci *RowcacheInvalidator) handleUnrecognizedEvent(sql string) {
tableInfo := rci.qe.schemaInfo.GetTable(tableName)
if tableInfo == nil {
log.Errorf("Table %s not found: %s", tableName, sql)
internalErrors.Add("Invalidation", 1)
rci.qe.queryServiceStats.InternalErrors.Add("Invalidation", 1)
return
}
if tableInfo.CacheType == schema.CACHE_NONE {

Просмотреть файл

@ -99,16 +99,17 @@ type SchemaOverride struct {
// SchemaInfo stores the schema info and performs operations that
// keep itself and the rowcache up-to-date.
type SchemaInfo struct {
mu sync.Mutex
tables map[string]*TableInfo
overrides []SchemaOverride
queries *cache.LRUCache
connPool *ConnPool
cachePool *CachePool
lastChange time.Time
ticks *timer.Timer
reloadTime time.Duration
endpoints map[string]string
mu sync.Mutex
tables map[string]*TableInfo
overrides []SchemaOverride
queries *cache.LRUCache
connPool *ConnPool
cachePool *CachePool
lastChange time.Time
ticks *timer.Timer
reloadTime time.Duration
endpoints map[string]string
queryServiceStats *QueryServiceStats
}
// NewSchemaInfo creates a new SchemaInfo.
@ -118,10 +119,11 @@ func NewSchemaInfo(
endpoints map[string]string,
reloadTime time.Duration,
idleTimeout time.Duration,
enablePublishStats bool) *SchemaInfo {
enablePublishStats bool,
queryServiceStats *QueryServiceStats) *SchemaInfo {
si := &SchemaInfo{
queries: cache.NewLRUCache(int64(queryCacheSize)),
connPool: NewConnPool("", 2, idleTimeout, enablePublishStats),
connPool: NewConnPool("", 2, idleTimeout, enablePublishStats, queryServiceStats),
ticks: timer.NewTimer(reloadTime),
endpoints: endpoints,
reloadTime: reloadTime,
@ -250,7 +252,7 @@ func (si *SchemaInfo) Close() {
// Reload reloads the schema info from the db. Any tables that have changed
// since the last load are updated.
func (si *SchemaInfo) Reload() {
defer logError()
defer logError(si.queryServiceStats)
ctx := context.Background()
// Get time first because it needs a connection from the pool.
curTime := si.mysqlTime(ctx)

Просмотреть файл

@ -32,7 +32,7 @@ func TestSchemaInfoStrictMode(t *testing.T) {
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool(false)
cachePool := newTestSchemaInfoCachePool(false, schemaInfo.queryServiceStats)
cachePool.Open()
defer cachePool.Close()
defer handleAndVerifyTabletError(
@ -57,7 +57,7 @@ func TestSchemaInfoOpenFailedDueToMissMySQLTime(t *testing.T) {
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool(false)
cachePool := newTestSchemaInfoCachePool(false, schemaInfo.queryServiceStats)
cachePool.Open()
defer cachePool.Close()
defer handleAndVerifyTabletError(
@ -81,7 +81,7 @@ func TestSchemaInfoOpenFailedDueToIncorrectMysqlRowNum(t *testing.T) {
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool(false)
cachePool := newTestSchemaInfoCachePool(false, schemaInfo.queryServiceStats)
cachePool.Open()
defer cachePool.Close()
defer handleAndVerifyTabletError(
@ -105,7 +105,7 @@ func TestSchemaInfoOpenFailedDueToInvalidTimeFormat(t *testing.T) {
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool(false)
cachePool := newTestSchemaInfoCachePool(false, schemaInfo.queryServiceStats)
cachePool.Open()
defer cachePool.Close()
defer handleAndVerifyTabletError(
@ -129,7 +129,7 @@ func TestSchemaInfoOpenFailedDueToExecErr(t *testing.T) {
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool(false)
cachePool := newTestSchemaInfoCachePool(false, schemaInfo.queryServiceStats)
cachePool.Open()
defer cachePool.Close()
defer handleAndVerifyTabletError(
@ -159,7 +159,7 @@ func TestSchemaInfoOpenFailedDueToTableInfoErr(t *testing.T) {
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool(false)
cachePool := newTestSchemaInfoCachePool(false, schemaInfo.queryServiceStats)
cachePool.Open()
defer cachePool.Close()
defer handleAndVerifyTabletError(
@ -179,7 +179,7 @@ func TestSchemaInfoOpenWithSchemaOverride(t *testing.T) {
schemaInfo := newTestSchemaInfo(10, 10*time.Second, 10*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool(false)
cachePool := newTestSchemaInfoCachePool(false, schemaInfo.queryServiceStats)
cachePool.Open()
defer cachePool.Close()
schemaOverrides := getSchemaInfoTestSchemaOverride()
@ -209,7 +209,7 @@ func TestSchemaInfoReload(t *testing.T) {
schemaInfo := newTestSchemaInfo(10, 10*time.Second, idleTimeout, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool(false)
cachePool := newTestSchemaInfoCachePool(false, schemaInfo.queryServiceStats)
cachePool.Open()
defer cachePool.Close()
// test cache type RW
@ -292,7 +292,7 @@ func TestSchemaInfoCreateOrUpdateTableFailedDuetoExecErr(t *testing.T) {
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool(false)
cachePool := newTestSchemaInfoCachePool(false, schemaInfo.queryServiceStats)
cachePool.Open()
defer cachePool.Close()
defer handleAndVerifyTabletError(
@ -320,7 +320,7 @@ func TestSchemaInfoCreateOrUpdateTable(t *testing.T) {
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool(false)
cachePool := newTestSchemaInfoCachePool(false, schemaInfo.queryServiceStats)
cachePool.Open()
defer cachePool.Close()
schemaInfo.Open(&appParams, &dbaParams, getSchemaInfoTestSchemaOverride(), cachePool, false)
@ -343,7 +343,7 @@ func TestSchemaInfoDropTable(t *testing.T) {
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool(false)
cachePool := newTestSchemaInfoCachePool(false, schemaInfo.queryServiceStats)
cachePool.Open()
defer cachePool.Close()
schemaInfo.Open(&appParams, &dbaParams, getSchemaInfoTestSchemaOverride(), cachePool, false)
@ -368,7 +368,7 @@ func TestSchemaInfoGetPlanPanicDuetoEmptyQuery(t *testing.T) {
schemaInfo := newTestSchemaInfo(10, 10*time.Second, 10*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool(false)
cachePool := newTestSchemaInfoCachePool(false, schemaInfo.queryServiceStats)
cachePool.Open()
defer cachePool.Close()
schemaOverrides := getSchemaInfoTestSchemaOverride()
@ -395,7 +395,7 @@ func TestSchemaInfoQueryCacheFailDueToInvalidCacheSize(t *testing.T) {
schemaInfo := newTestSchemaInfo(10, 10*time.Second, 10*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool(false)
cachePool := newTestSchemaInfoCachePool(false, schemaInfo.queryServiceStats)
cachePool.Open()
defer cachePool.Close()
schemaOverrides := getSchemaInfoTestSchemaOverride()
@ -419,7 +419,7 @@ func TestSchemaInfoQueryCache(t *testing.T) {
schemaInfo := newTestSchemaInfo(10, 10*time.Second, 10*time.Second, true)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool(true)
cachePool := newTestSchemaInfoCachePool(true, schemaInfo.queryServiceStats)
cachePool.Open()
defer cachePool.Close()
schemaOverrides := getSchemaInfoTestSchemaOverride()
@ -455,7 +455,7 @@ func TestSchemaInfoExportVars(t *testing.T) {
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, true)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool(true)
cachePool := newTestSchemaInfoCachePool(true, schemaInfo.queryServiceStats)
cachePool.Open()
defer cachePool.Close()
schemaInfo.Open(&appParams, &dbaParams, []SchemaOverride{}, cachePool, true)
@ -474,7 +474,7 @@ func TestSchemaInfoStatsURL(t *testing.T) {
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool(false)
cachePool := newTestSchemaInfoCachePool(false, schemaInfo.queryServiceStats)
cachePool.Open()
defer cachePool.Close()
schemaInfo.Open(&appParams, &dbaParams, []SchemaOverride{}, cachePool, true)
@ -506,7 +506,7 @@ func TestSchemaInfoStatsURL(t *testing.T) {
schemaInfo.ServeHTTP(response, request)
}
func newTestSchemaInfoCachePool(enablePublishStats bool) *CachePool {
func newTestSchemaInfoCachePool(enablePublishStats bool, queryServiceStats *QueryServiceStats) *CachePool {
rowCacheConfig := RowCacheConfig{
Binary: "ls",
Connections: 100,
@ -514,7 +514,14 @@ func newTestSchemaInfoCachePool(enablePublishStats bool) *CachePool {
randID := rand.Int63()
name := fmt.Sprintf("TestCachePool-%d-", randID)
statsURL := fmt.Sprintf("/debug/cache-%d", randID)
return NewCachePool(name, rowCacheConfig, 1*time.Second, statsURL, enablePublishStats)
return NewCachePool(
name,
rowCacheConfig,
1*time.Second,
statsURL,
enablePublishStats,
queryServiceStats,
)
}
func getSchemaInfoBaseTestQueries() map[string]*mproto.QueryResult {

Просмотреть файл

@ -141,7 +141,7 @@ func (sq *SqlQuery) allowQueries(dbconfigs *dbconfigs.DBConfigs, schemaOverrides
sq.setState(StateInitializing)
sq.mu.Unlock()
c, err := dbconnpool.NewDBConnection(&dbconfigs.App.ConnParams, mysqlStats)
c, err := dbconnpool.NewDBConnection(&dbconfigs.App.ConnParams, sq.qe.queryServiceStats.MySQLStats)
if err != nil {
log.Infof("allowQueries failed: %v", err)
sq.mu.Lock()
@ -267,14 +267,14 @@ func (sq *SqlQuery) GetSessionId(sessionParams *proto.SessionParams, sessionInfo
func (sq *SqlQuery) Begin(ctx context.Context, session *proto.Session, txInfo *proto.TransactionInfo) (err error) {
logStats := newSqlQueryStats("Begin", ctx)
logStats.OriginalSql = "begin"
defer handleError(&err, logStats)
defer handleError(&err, logStats, sq.qe.queryServiceStats)
if err = sq.startRequest(session.SessionId, false, false); err != nil {
return err
}
ctx, cancel := withTimeout(ctx, sq.qe.txPool.PoolTimeout())
defer func() {
queryStats.Record("BEGIN", time.Now())
sq.qe.queryServiceStats.QueryStats.Record("BEGIN", time.Now())
cancel()
sq.endRequest()
}()
@ -289,14 +289,14 @@ func (sq *SqlQuery) Commit(ctx context.Context, session *proto.Session) (err err
logStats := newSqlQueryStats("Commit", ctx)
logStats.OriginalSql = "commit"
logStats.TransactionID = session.TransactionId
defer handleError(&err, logStats)
defer handleError(&err, logStats, sq.qe.queryServiceStats)
if err = sq.startRequest(session.SessionId, false, true); err != nil {
return err
}
ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get())
defer func() {
queryStats.Record("COMMIT", time.Now())
sq.qe.queryServiceStats.QueryStats.Record("COMMIT", time.Now())
cancel()
sq.endRequest()
}()
@ -310,14 +310,14 @@ func (sq *SqlQuery) Rollback(ctx context.Context, session *proto.Session) (err e
logStats := newSqlQueryStats("Rollback", ctx)
logStats.OriginalSql = "rollback"
logStats.TransactionID = session.TransactionId
defer handleError(&err, logStats)
defer handleError(&err, logStats, sq.qe.queryServiceStats)
if err = sq.startRequest(session.SessionId, false, true); err != nil {
return err
}
ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get())
defer func() {
queryStats.Record("ROLLBACK", time.Now())
sq.qe.queryServiceStats.QueryStats.Record("ROLLBACK", time.Now())
cancel()
sq.endRequest()
}()
@ -334,7 +334,7 @@ func (sq *SqlQuery) handleExecError(query *proto.Query, err *error, logStats *SQ
if !ok {
log.Errorf("Uncaught panic for %v:\n%v\n%s", query, x, tb.Stack(4))
*err = NewTabletError(ErrFail, "%v: uncaught panic for %v", x, query)
internalErrors.Add("Panic", 1)
sq.qe.queryServiceStats.InternalErrors.Add("Panic", 1)
return
}
if sq.config.TerseErrors && terr.SqlError != 0 {
@ -342,7 +342,7 @@ func (sq *SqlQuery) handleExecError(query *proto.Query, err *error, logStats *SQ
} else {
*err = terr
}
terr.RecordStats()
terr.RecordStats(sq.qe.queryServiceStats)
// suppress these errors in logs
if terr.ErrorType == ErrRetry || terr.ErrorType == ErrTxPoolFull || terr.SqlError == mysql.ErrDupEntry {
return
@ -438,7 +438,7 @@ func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList
return err
}
defer sq.endRequest()
defer handleError(&err, nil)
defer handleError(&err, nil, sq.qe.queryServiceStats)
beginCalled := false
session := proto.Session{
@ -497,7 +497,7 @@ func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList
// SplitQuery splits a BoundQuery into smaller queries that return a subset of rows from the original query.
func (sq *SqlQuery) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) (err error) {
logStats := newSqlQueryStats("SplitQuery", ctx)
defer handleError(&err, logStats)
defer handleError(&err, logStats, sq.qe.queryServiceStats)
if err = sq.startRequest(req.SessionID, false, false); err != nil {
return err
}

Просмотреть файл

@ -961,8 +961,10 @@ func TestHandleExecUnknownError(t *testing.T) {
BindVariables: nil,
}
var err error
sq := &SqlQuery{}
defer sq.handleExecError(&query, &err, logStats)
testUtils := newTestUtils()
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
defer sqlQuery.handleExecError(&query, &err, logStats)
panic("unknown exec error")
}
@ -980,8 +982,10 @@ func TestHandleExecTabletError(t *testing.T) {
t.Errorf("Error: %v, want '%s'", err, want)
}
}()
sq := &SqlQuery{}
defer sq.handleExecError(&query, &err, logStats)
testUtils := newTestUtils()
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
defer sqlQuery.handleExecError(&query, &err, logStats)
panic(NewTabletError(ErrFatal, "tablet error"))
}
@ -999,9 +1003,11 @@ func TestTerseErrors1(t *testing.T) {
t.Errorf("Error: %v, want '%s'", err, want)
}
}()
sq := &SqlQuery{}
sq.config.TerseErrors = true
defer sq.handleExecError(&query, &err, logStats)
testUtils := newTestUtils()
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
sqlQuery.config.TerseErrors = true
defer sqlQuery.handleExecError(&query, &err, logStats)
panic(NewTabletError(ErrFatal, "tablet error"))
}
@ -1019,9 +1025,11 @@ func TestTerseErrors2(t *testing.T) {
t.Errorf("Error: %v, want '%s'", err, want)
}
}()
sq := &SqlQuery{}
sq.config.TerseErrors = true
defer sq.handleExecError(&query, &err, logStats)
testUtils := newTestUtils()
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
sqlQuery.config.TerseErrors = true
defer sqlQuery.handleExecError(&query, &err, logStats)
panic(&TabletError{
ErrorType: ErrFail,
Message: "msg",

Просмотреть файл

@ -89,7 +89,7 @@ func (rqsc *realQueryServiceControl) AddStatusPart() {
status := queryserviceStatus{
State: rqsc.sqlQueryRPCService.GetState(),
}
rates := qpsRates.Get()
rates := rqsc.sqlQueryRPCService.qe.queryServiceStats.QPSRates.Get()
if qps, ok := rates["All"]; ok && len(qps) > 0 {
status.CurrentQPS = qps[0]

Просмотреть файл

@ -268,9 +268,9 @@ func newTestTableInfo(cachePool *CachePool, tableType string, comment string) (*
ctx := context.Background()
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
queryServiceStats := NewQueryServiceStats("", false)
connPoolIdleTimeout := 10 * time.Second
connPool := NewConnPool("", 2, connPoolIdleTimeout, false)
connPool := NewConnPool("", 2, connPoolIdleTimeout, false, queryServiceStats)
connPool.Open(&appParams, &dbaParams)
conn, err := connPool.Get(ctx)
if err != nil {
@ -295,7 +295,14 @@ func newTestTableInfoCachePool() *CachePool {
randID := rand.Int63()
name := fmt.Sprintf("TestCachePool-TableInfo-%d-", randID)
statsURL := fmt.Sprintf("/debug/tableinfo-cache-%d", randID)
return NewCachePool(name, rowCacheConfig, 1*time.Second, statsURL, false)
return NewCachePool(
name,
rowCacheConfig,
1*time.Second,
statsURL,
false,
NewQueryServiceStats("", false),
)
}
func getTestTableInfoQueries() map[string]*mproto.QueryResult {

Просмотреть файл

@ -146,39 +146,39 @@ func (te *TabletError) Prefix() string {
}
// RecordStats will record the error in the proper stat bucket
func (te *TabletError) RecordStats() {
func (te *TabletError) RecordStats(queryServiceStats *QueryServiceStats) {
switch te.ErrorType {
case ErrRetry:
infoErrors.Add("Retry", 1)
queryServiceStats.InfoErrors.Add("Retry", 1)
case ErrFatal:
infoErrors.Add("Fatal", 1)
queryServiceStats.InfoErrors.Add("Fatal", 1)
case ErrTxPoolFull:
errorStats.Add("TxPoolFull", 1)
queryServiceStats.ErrorStats.Add("TxPoolFull", 1)
case ErrNotInTx:
errorStats.Add("NotInTx", 1)
queryServiceStats.ErrorStats.Add("NotInTx", 1)
default:
switch te.SqlError {
case mysql.ErrDupEntry:
infoErrors.Add("DupKey", 1)
queryServiceStats.InfoErrors.Add("DupKey", 1)
case mysql.ErrLockWaitTimeout, mysql.ErrLockDeadlock:
errorStats.Add("Deadlock", 1)
queryServiceStats.ErrorStats.Add("Deadlock", 1)
default:
errorStats.Add("Fail", 1)
queryServiceStats.ErrorStats.Add("Fail", 1)
}
}
}
func handleError(err *error, logStats *SQLQueryStats) {
func handleError(err *error, logStats *SQLQueryStats, queryServiceStats *QueryServiceStats) {
if x := recover(); x != nil {
terr, ok := x.(*TabletError)
if !ok {
log.Errorf("Uncaught panic:\n%v\n%s", x, tb.Stack(4))
*err = NewTabletError(ErrFail, "%v: uncaught panic", x)
internalErrors.Add("Panic", 1)
queryServiceStats.InternalErrors.Add("Panic", 1)
return
}
*err = terr
terr.RecordStats()
terr.RecordStats(queryServiceStats)
if terr.ErrorType == ErrRetry { // Retry errors are too spammy
return
}
@ -194,12 +194,12 @@ func handleError(err *error, logStats *SQLQueryStats) {
}
}
func logError() {
func logError(queryServiceStats *QueryServiceStats) {
if x := recover(); x != nil {
terr, ok := x.(*TabletError)
if !ok {
log.Errorf("Uncaught panic:\n%v\n%s", x, tb.Stack(4))
internalErrors.Add("Panic", 1)
queryServiceStats.InternalErrors.Add("Panic", 1)
return
}
if terr.ErrorType == ErrTxPoolFull {

Просмотреть файл

@ -95,65 +95,66 @@ func TestTabletErrorPrefix(t *testing.T) {
func TestTabletErrorRecordStats(t *testing.T) {
tabletErr := NewTabletErrorSql(ErrRetry, sqldb.NewSqlError(2000, "test"))
retryCounterBefore := infoErrors.Counts()["Retry"]
tabletErr.RecordStats()
retryCounterAfter := infoErrors.Counts()["Retry"]
queryServiceStats := NewQueryServiceStats("", false)
retryCounterBefore := queryServiceStats.InfoErrors.Counts()["Retry"]
tabletErr.RecordStats(queryServiceStats)
retryCounterAfter := queryServiceStats.InfoErrors.Counts()["Retry"]
if retryCounterAfter-retryCounterBefore != 1 {
t.Fatalf("tablet error with error type ErrRetry should increase Retry error count by 1")
}
tabletErr = NewTabletErrorSql(ErrFatal, sqldb.NewSqlError(2000, "test"))
fatalCounterBefore := infoErrors.Counts()["Fatal"]
tabletErr.RecordStats()
fatalCounterAfter := infoErrors.Counts()["Fatal"]
fatalCounterBefore := queryServiceStats.InfoErrors.Counts()["Fatal"]
tabletErr.RecordStats(queryServiceStats)
fatalCounterAfter := queryServiceStats.InfoErrors.Counts()["Fatal"]
if fatalCounterAfter-fatalCounterBefore != 1 {
t.Fatalf("tablet error with error type ErrFatal should increase Fatal error count by 1")
}
tabletErr = NewTabletErrorSql(ErrTxPoolFull, sqldb.NewSqlError(2000, "test"))
txPoolFullCounterBefore := errorStats.Counts()["TxPoolFull"]
tabletErr.RecordStats()
txPoolFullCounterAfter := errorStats.Counts()["TxPoolFull"]
txPoolFullCounterBefore := queryServiceStats.ErrorStats.Counts()["TxPoolFull"]
tabletErr.RecordStats(queryServiceStats)
txPoolFullCounterAfter := queryServiceStats.ErrorStats.Counts()["TxPoolFull"]
if txPoolFullCounterAfter-txPoolFullCounterBefore != 1 {
t.Fatalf("tablet error with error type ErrTxPoolFull should increase TxPoolFull error count by 1")
}
tabletErr = NewTabletErrorSql(ErrNotInTx, sqldb.NewSqlError(2000, "test"))
notInTxCounterBefore := errorStats.Counts()["NotInTx"]
tabletErr.RecordStats()
notInTxCounterAfter := errorStats.Counts()["NotInTx"]
notInTxCounterBefore := queryServiceStats.ErrorStats.Counts()["NotInTx"]
tabletErr.RecordStats(queryServiceStats)
notInTxCounterAfter := queryServiceStats.ErrorStats.Counts()["NotInTx"]
if notInTxCounterAfter-notInTxCounterBefore != 1 {
t.Fatalf("tablet error with error type ErrNotInTx should increase NotInTx error count by 1")
}
tabletErr = NewTabletErrorSql(ErrFail, sqldb.NewSqlError(mysql.ErrDupEntry, "test"))
dupKeyCounterBefore := infoErrors.Counts()["DupKey"]
tabletErr.RecordStats()
dupKeyCounterAfter := infoErrors.Counts()["DupKey"]
dupKeyCounterBefore := queryServiceStats.InfoErrors.Counts()["DupKey"]
tabletErr.RecordStats(queryServiceStats)
dupKeyCounterAfter := queryServiceStats.InfoErrors.Counts()["DupKey"]
if dupKeyCounterAfter-dupKeyCounterBefore != 1 {
t.Fatalf("sql error with error type mysql.ErrDupEntry should increase DupKey error count by 1")
}
tabletErr = NewTabletErrorSql(ErrFail, sqldb.NewSqlError(mysql.ErrLockWaitTimeout, "test"))
lockWaitTimeoutCounterBefore := errorStats.Counts()["Deadlock"]
tabletErr.RecordStats()
lockWaitTimeoutCounterAfter := errorStats.Counts()["Deadlock"]
lockWaitTimeoutCounterBefore := queryServiceStats.ErrorStats.Counts()["Deadlock"]
tabletErr.RecordStats(queryServiceStats)
lockWaitTimeoutCounterAfter := queryServiceStats.ErrorStats.Counts()["Deadlock"]
if lockWaitTimeoutCounterAfter-lockWaitTimeoutCounterBefore != 1 {
t.Fatalf("sql error with error type mysql.ErrLockWaitTimeout should increase Deadlock error count by 1")
}
tabletErr = NewTabletErrorSql(ErrFail, sqldb.NewSqlError(mysql.ErrLockDeadlock, "test"))
deadlockCounterBefore := errorStats.Counts()["Deadlock"]
tabletErr.RecordStats()
deadlockCounterAfter := errorStats.Counts()["Deadlock"]
deadlockCounterBefore := queryServiceStats.ErrorStats.Counts()["Deadlock"]
tabletErr.RecordStats(queryServiceStats)
deadlockCounterAfter := queryServiceStats.ErrorStats.Counts()["Deadlock"]
if deadlockCounterAfter-deadlockCounterBefore != 1 {
t.Fatalf("sql error with error type mysql.ErrLockDeadlock should increase Deadlock error count by 1")
}
tabletErr = NewTabletErrorSql(ErrFail, sqldb.NewSqlError(mysql.ErrOptionPreventsStatement, "test"))
failCounterBefore := errorStats.Counts()["Fail"]
tabletErr.RecordStats()
failCounterAfter := errorStats.Counts()["Fail"]
failCounterBefore := queryServiceStats.ErrorStats.Counts()["Fail"]
tabletErr.RecordStats(queryServiceStats)
failCounterAfter := queryServiceStats.ErrorStats.Counts()["Fail"]
if failCounterAfter-failCounterBefore != 1 {
t.Fatalf("sql error with error type mysql.ErrOptionPreventsStatement should increase Fail error count by 1")
}
@ -162,13 +163,14 @@ func TestTabletErrorRecordStats(t *testing.T) {
func TestTabletErrorHandleUncaughtError(t *testing.T) {
var err error
logStats := newSqlQueryStats("TestTabletErrorHandleError", context.Background())
queryServiceStats := NewQueryServiceStats("", false)
defer func() {
_, ok := err.(*TabletError)
if !ok {
t.Fatalf("error should be a TabletError, but got error: %v", err)
}
}()
defer handleError(&err, logStats)
defer handleError(&err, logStats, queryServiceStats)
panic("unknown error")
}
@ -176,13 +178,14 @@ func TestTabletErrorHandleRetryError(t *testing.T) {
var err error
tabletErr := NewTabletErrorSql(ErrRetry, sqldb.NewSqlError(1000, "test"))
logStats := newSqlQueryStats("TestTabletErrorHandleError", context.Background())
queryServiceStats := NewQueryServiceStats("", false)
defer func() {
_, ok := err.(*TabletError)
if !ok {
t.Fatalf("error should be a TabletError, but got error: %v", err)
}
}()
defer handleError(&err, logStats)
defer handleError(&err, logStats, queryServiceStats)
panic(tabletErr)
}
@ -190,48 +193,52 @@ func TestTabletErrorHandleTxPoolFullError(t *testing.T) {
var err error
tabletErr := NewTabletErrorSql(ErrTxPoolFull, sqldb.NewSqlError(1000, "test"))
logStats := newSqlQueryStats("TestTabletErrorHandleError", context.Background())
queryServiceStats := NewQueryServiceStats("", false)
defer func() {
_, ok := err.(*TabletError)
if !ok {
t.Fatalf("error should be a TabletError, but got error: %v", err)
}
}()
defer handleError(&err, logStats)
defer handleError(&err, logStats, queryServiceStats)
panic(tabletErr)
}
func TestTabletErrorLogUncaughtErr(t *testing.T) {
panicCountBefore := internalErrors.Counts()["Panic"]
queryServiceStats := NewQueryServiceStats("", false)
panicCountBefore := queryServiceStats.InternalErrors.Counts()["Panic"]
defer func() {
panicCountAfter := internalErrors.Counts()["Panic"]
panicCountAfter := queryServiceStats.InternalErrors.Counts()["Panic"]
if panicCountAfter-panicCountBefore != 1 {
t.Fatalf("Panic count should increase by 1 for uncaught panic")
}
}()
defer logError()
defer logError(queryServiceStats)
panic("unknown error")
}
func TestTabletErrorTxPoolFull(t *testing.T) {
tabletErr := NewTabletErrorSql(ErrTxPoolFull, sqldb.NewSqlError(1000, "test"))
queryServiceStats := NewQueryServiceStats("", false)
defer func() {
err := recover()
if err != nil {
t.Fatalf("error should have been handled already")
}
}()
defer logError()
defer logError(queryServiceStats)
panic(tabletErr)
}
func TestTabletErrorFatal(t *testing.T) {
tabletErr := NewTabletErrorSql(ErrFatal, sqldb.NewSqlError(1000, "test"))
queryServiceStats := NewQueryServiceStats("", false)
defer func() {
err := recover()
if err != nil {
t.Fatalf("error should have been handled already")
}
}()
defer logError()
defer logError(queryServiceStats)
panic(tabletErr)
}

Просмотреть файл

@ -128,15 +128,27 @@ func (util *testUtils) newQueryServiceConfig() Config {
return config
}
func (util *testUtils) newConnPool() *ConnPool {
return NewConnPool(
"ConnPool",
100,
10*time.Second,
false,
NewQueryServiceStats("", false),
)
}
func newTestSchemaInfo(
queryCacheSize int,
reloadTime time.Duration,
idleTimeout time.Duration,
enablePublishStats bool) *SchemaInfo {
randID := rand.Int63()
name := fmt.Sprintf("TestSchemaInfo-%d-", randID)
queryServiceStats := NewQueryServiceStats(name, enablePublishStats)
return NewSchemaInfo(
queryCacheSize,
fmt.Sprintf("TestSchemaInfo-%d-", randID),
name,
map[string]string{
debugQueryPlansKey: fmt.Sprintf("/debug/query_plans_%d", randID),
debugQueryStatsKey: fmt.Sprintf("/debug/query_stats_%d", randID),
@ -146,5 +158,6 @@ func newTestSchemaInfo(
reloadTime,
idleTimeout,
enablePublishStats,
queryServiceStats,
)
}

Просмотреть файл

@ -45,14 +45,14 @@ const txLogInterval = time.Duration(1 * time.Minute)
// TxPool is the transaction pool for the query service.
type TxPool struct {
pool *ConnPool
activePool *pools.Numbered
lastID sync2.AtomicInt64
timeout sync2.AtomicDuration
poolTimeout sync2.AtomicDuration
ticks *timer.Timer
txStats *stats.Timings
pool *ConnPool
activePool *pools.Numbered
lastID sync2.AtomicInt64
timeout sync2.AtomicDuration
poolTimeout sync2.AtomicDuration
ticks *timer.Timer
txStats *stats.Timings
queryServiceStats *QueryServiceStats
// Tracking culprits that cause tx pool full errors.
logMu sync.Mutex
lastLog time.Time
@ -66,15 +66,23 @@ func NewTxPool(
timeout time.Duration,
poolTimeout time.Duration,
idleTimeout time.Duration,
enablePublishStats bool) *TxPool {
enablePublishStats bool,
qStats *QueryServiceStats) *TxPool {
txStatsName := ""
if enablePublishStats {
txStatsName = txStatsPrefix + "Transactions"
}
axp := &TxPool{
pool: NewConnPool(name, capacity, idleTimeout, enablePublishStats),
activePool: pools.NewNumbered(),
lastID: sync2.AtomicInt64(time.Now().UnixNano()),
timeout: sync2.AtomicDuration(timeout),
poolTimeout: sync2.AtomicDuration(poolTimeout),
ticks: timer.NewTimer(timeout / 10),
txStats: stats.NewTimings(txStatsPrefix + "Transactions"),
pool: NewConnPool(name, capacity, idleTimeout, enablePublishStats, qStats),
activePool: pools.NewNumbered(),
lastID: sync2.AtomicInt64(time.Now().UnixNano()),
timeout: sync2.AtomicDuration(timeout),
poolTimeout: sync2.AtomicDuration(poolTimeout),
ticks: timer.NewTimer(timeout / 10),
txStats: stats.NewTimings(txStatsName),
queryServiceStats: qStats,
}
// Careful: pool also exports name+"xxx" vars,
// but we know it doesn't export Timeout.
@ -99,7 +107,7 @@ func (axp *TxPool) Close() {
for _, v := range axp.activePool.GetOutdated(time.Duration(0), "for closing") {
conn := v.(*TxConnection)
log.Warningf("killing transaction for shutdown: %s", conn.Format(nil))
internalErrors.Add("StrayTransactions", 1)
axp.queryServiceStats.InternalErrors.Add("StrayTransactions", 1)
conn.Close()
conn.discard(TxClose)
}
@ -112,11 +120,11 @@ func (axp *TxPool) WaitForEmpty() {
}
func (axp *TxPool) transactionKiller() {
defer logError()
defer logError(axp.queryServiceStats)
for _, v := range axp.activePool.GetOutdated(time.Duration(axp.Timeout()), "for rollback") {
conn := v.(*TxConnection)
log.Warningf("killing transaction (exceeded timeout: %v): %s", axp.Timeout(), conn.Format(nil))
killStats.Add("Transactions", 1)
axp.queryServiceStats.KillStats.Add("Transactions", 1)
conn.Close()
conn.discard(TxKill)
}
@ -155,7 +163,7 @@ func (axp *TxPool) Begin(ctx context.Context) int64 {
// returns an error on failure instead of panic. The connection becomes free
// and can be reused in the future.
func (axp *TxPool) SafeCommit(ctx context.Context, transactionID int64) (invalidList map[string]DirtyKeys, err error) {
defer handleError(&err, nil)
defer handleError(&err, nil, axp.queryServiceStats)
conn := axp.Get(transactionID)
defer conn.discard(TxCommit)

Просмотреть файл

@ -12,7 +12,6 @@ import (
"github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/vt/tabletserver/fakesqldb"
"golang.org/x/net/context"
)
@ -79,14 +78,14 @@ func TestTransactionKiller(t *testing.T) {
txPool.Open(&appParams, &dbaParams)
defer txPool.Close()
ctx := context.Background()
killCount := killStats.Counts()["Transactions"]
killCount := txPool.queryServiceStats.KillStats.Counts()["Transactions"]
transactionID := txPool.Begin(ctx)
txConn := txPool.Get(transactionID)
txConn.RecordQuery(sql)
txConn.Recycle()
// transaction killer should kill the query
txPool.WaitForEmpty()
killCountDiff := killStats.Counts()["Transactions"] - killCount
killCountDiff := txPool.queryServiceStats.KillStats.Counts()["Transactions"] - killCount
if killCountDiff != 1 {
t.Fatalf("query: %s should be killed by transaction killer", sql)
}
@ -261,6 +260,7 @@ func newTxPool(enablePublishStats bool) *TxPool {
transactionTimeout := time.Duration(30 * time.Second)
txPoolTimeout := time.Duration(30 * time.Second)
idleTimeout := time.Duration(30 * time.Second)
queryServiceStats := NewQueryServiceStats("", enablePublishStats)
return NewTxPool(
poolName,
txStatsPrefix,
@ -269,20 +269,6 @@ func newTxPool(enablePublishStats bool) *TxPool {
txPoolTimeout,
idleTimeout,
enablePublishStats,
queryServiceStats,
)
}
func init() {
rand.Seed(time.Now().UnixNano())
name := "TestTxPool"
mysqlStats = stats.NewTimings(name + "Mysql")
queryStats = stats.NewTimings(name + "Queries")
waitStats = stats.NewTimings(name + "Waits")
killStats = stats.NewCounters(name + "Kills")
infoErrors = stats.NewCounters(name + "InfoErrors")
errorStats = stats.NewCounters(name + "Errors")
internalErrors = stats.NewCounters(name + "InternalErrors")
resultStats = stats.NewHistogram(name+"Results", resultBuckets)
spotCheckCount = stats.NewInt(name + "RowcacheSpotCheckCount")
qpsRates = stats.NewRates(name+"QPS", queryStats, 15, 60*time.Second)
}