stats & diagnostics: query_stats and health checks

This commit is contained in:
Sugu Sougoumarane 2013-01-28 13:33:04 -08:00
Родитель 136063d078
Коммит 3c030c757d
8 изменённых файлов: 210 добавлений и 115 удалений

Просмотреть файл

@ -18,7 +18,6 @@ type PlanType int
const ( const (
PLAN_PASS_SELECT PlanType = iota PLAN_PASS_SELECT PlanType = iota
PLAN_PASS_DML PLAN_PASS_DML
PLAN_SELECT_CACHE_RESULT
PLAN_SELECT_PK PLAN_SELECT_PK
PLAN_SELECT_SUBQUERY PLAN_SELECT_SUBQUERY
PLAN_DML_PK PLAN_DML_PK
@ -27,28 +26,33 @@ const (
PLAN_INSERT_SUBQUERY PLAN_INSERT_SUBQUERY
PLAN_SET PLAN_SET
PLAN_DDL PLAN_DDL
NumPlans
) )
func (pt PlanType) IsSelect() bool { // Must exactly match order of plan constants.
return pt == PLAN_PASS_SELECT || pt == PLAN_SELECT_CACHE_RESULT || pt == PLAN_SELECT_PK || pt == PLAN_SELECT_SUBQUERY var planName = []string{
"PASS_SELECT",
"PASS_DML",
"SELECT_PK",
"SELECT_SUBQUERY",
"DML_PK",
"DML_SUBQUERY",
"INSERT_PK",
"INSERT_SUBQUERY",
"SET",
"DDL",
} }
var planName = map[PlanType]string{ func (pt PlanType) String() string {
PLAN_PASS_SELECT: "PASS_SELECT", return planName[pt]
PLAN_PASS_DML: "PASS_DML", }
PLAN_SELECT_CACHE_RESULT: "SELECT_CACHE_RESULT",
PLAN_SELECT_PK: "SELECT_PK", func (pt PlanType) IsSelect() bool {
PLAN_SELECT_SUBQUERY: "SELECT_SUBQUERY", return pt == PLAN_PASS_SELECT || pt == PLAN_SELECT_PK || pt == PLAN_SELECT_SUBQUERY
PLAN_DML_PK: "DML_PK",
PLAN_DML_SUBQUERY: "DML_SUBQUERY",
PLAN_INSERT_PK: "INSERT_PK",
PLAN_INSERT_SUBQUERY: "INSERT_SUBQUERY",
PLAN_SET: "SET",
PLAN_DDL: "DDL",
} }
func (pt PlanType) MarshalJSON() ([]byte, error) { func (pt PlanType) MarshalJSON() ([]byte, error) {
return ([]byte)(fmt.Sprintf("\"%s\"", planName[pt])), nil return ([]byte)(fmt.Sprintf("\"%s\"", pt.String())), nil
} }
type ReasonType int type ReasonType int
@ -69,24 +73,29 @@ const (
REASON_HAS_HINTS REASON_HAS_HINTS
) )
var reasonName = map[ReasonType]string{ // Must exactly match order of reason constants.
REASON_DEFAULT: "DEFAULT", var reasonName = []string{
REASON_SELECT: "SELECT", "DEFAULT",
REASON_TABLE: "TABLE", "SELECT",
REASON_NOCACHE: "NOCACHE", "TABLE",
REASON_SELECT_LIST: "SELECT_LIST", "NOCACHE",
REASON_FOR_UPDATE: "FOR_UPDATE", "SELECT_LIST",
REASON_WHERE: "WHERE", "FOR_UPDATE",
REASON_ORDER: "ORDER", "WHERE",
REASON_PKINDEX: "PKINDEX", "ORDER",
REASON_NOINDEX_MATCH: "NOINDEX_MATCH", "PKINDEX",
REASON_TABLE_NOINDEX: "TABLE_NOINDEX", "NOINDEX_MATCH",
REASON_PK_CHANGE: "PK_CHANGE", "TABLE_NOINDEX",
REASON_HAS_HINTS: "HAS_HINTS", "PK_CHANGE",
"HAS_HINTS",
}
func (rt ReasonType) String() string {
return reasonName[rt]
} }
func (rt ReasonType) MarshalJSON() ([]byte, error) { func (rt ReasonType) MarshalJSON() ([]byte, error) {
return ([]byte)(fmt.Sprintf("\"%s\"", reasonName[rt])), nil return ([]byte)(fmt.Sprintf("\"%s\"", rt.String())), nil
} }
// ExecPlan is built for selects and DMLs. // ExecPlan is built for selects and DMLs.
@ -259,10 +268,7 @@ func (node *Node) execAnalyzeSelect(getTable TableGetter) (plan *ExecPlan) {
plan.Reason = REASON_SELECT_LIST plan.Reason = REASON_SELECT_LIST
return plan return plan
} }
// The plan has improved
plan.PlanId = PLAN_SELECT_CACHE_RESULT
plan.ColumnNumbers = selects plan.ColumnNumbers = selects
plan.OuterQuery = node.GenerateDefaultQuery(tableInfo)
// where // where
conditions := node.At(SELECT_WHERE_OFFSET).execAnalyzeWhere() conditions := node.At(SELECT_WHERE_OFFSET).execAnalyzeWhere()

Просмотреть файл

@ -77,12 +77,12 @@ select * from a having b=1
# limit # limit
select * from a limit 5 select * from a limit 5
{ {
"PlanId": "SELECT_CACHE_RESULT", "PlanId": "PASS_SELECT",
"Reason": "WHERE", "Reason": "WHERE",
"TableName": "a", "TableName": "a",
"FieldQuery": "select * from a where 1 != 1", "FieldQuery": "select * from a where 1 != 1",
"FullQuery": "select * from a limit 5", "FullQuery": "select * from a limit 5",
"OuterQuery": "select eid, id, name, foo from a limit 5", "OuterQuery": null,
"Subquery": null, "Subquery": null,
"IndexUsed": "", "IndexUsed": "",
"ColumnNumbers": [ "ColumnNumbers": [
@ -196,12 +196,12 @@ select eid+1 from a
# simple # simple
select eid from a select eid from a
{ {
"PlanId": "SELECT_CACHE_RESULT", "PlanId": "PASS_SELECT",
"Reason": "WHERE", "Reason": "WHERE",
"TableName": "a", "TableName": "a",
"FieldQuery": "select eid from a where 1 != 1", "FieldQuery": "select eid from a where 1 != 1",
"FullQuery": "select eid from a limit :_vtMaxResultSize", "FullQuery": "select eid from a limit :_vtMaxResultSize",
"OuterQuery": "select eid, id, name, foo from a limit :_vtMaxResultSize", "OuterQuery": null,
"Subquery": null, "Subquery": null,
"IndexUsed": "", "IndexUsed": "",
"ColumnNumbers": [ "ColumnNumbers": [
@ -217,12 +217,12 @@ select eid from a
# * # *
select * from a select * from a
{ {
"PlanId": "SELECT_CACHE_RESULT", "PlanId": "PASS_SELECT",
"Reason": "WHERE", "Reason": "WHERE",
"TableName": "a", "TableName": "a",
"FieldQuery": "select * from a where 1 != 1", "FieldQuery": "select * from a where 1 != 1",
"FullQuery": "select * from a limit :_vtMaxResultSize", "FullQuery": "select * from a limit :_vtMaxResultSize",
"OuterQuery": "select eid, id, name, foo from a limit :_vtMaxResultSize", "OuterQuery": null,
"Subquery": null, "Subquery": null,
"IndexUsed": "", "IndexUsed": "",
"ColumnNumbers": [ "ColumnNumbers": [
@ -241,12 +241,12 @@ select * from a
# c.eid # c.eid
select c.eid from a as c select c.eid from a as c
{ {
"PlanId": "SELECT_CACHE_RESULT", "PlanId": "PASS_SELECT",
"Reason": "WHERE", "Reason": "WHERE",
"TableName": "a", "TableName": "a",
"FieldQuery": "select c.eid from a as c where 1 != 1", "FieldQuery": "select c.eid from a as c where 1 != 1",
"FullQuery": "select c.eid from a as c limit :_vtMaxResultSize", "FullQuery": "select c.eid from a as c limit :_vtMaxResultSize",
"OuterQuery": "select eid, id, name, foo from a as c limit :_vtMaxResultSize", "OuterQuery": null,
"Subquery": null, "Subquery": null,
"IndexUsed": "", "IndexUsed": "",
"ColumnNumbers": [ "ColumnNumbers": [
@ -262,12 +262,12 @@ select c.eid from a as c
# (eid) # (eid)
select (eid) from a as c select (eid) from a as c
{ {
"PlanId": "SELECT_CACHE_RESULT", "PlanId": "PASS_SELECT",
"Reason": "WHERE", "Reason": "WHERE",
"TableName": "a", "TableName": "a",
"FieldQuery": "select eid from a as c where 1 != 1", "FieldQuery": "select eid from a as c where 1 != 1",
"FullQuery": "select eid from a as c limit :_vtMaxResultSize", "FullQuery": "select eid from a as c limit :_vtMaxResultSize",
"OuterQuery": "select eid, id, name, foo from a as c limit :_vtMaxResultSize", "OuterQuery": null,
"Subquery": null, "Subquery": null,
"IndexUsed": "", "IndexUsed": "",
"ColumnNumbers": [ "ColumnNumbers": [
@ -326,12 +326,12 @@ select * from a where eid=1
# complex where (expression) # complex where (expression)
select * from a where eid+1 = 1 select * from a where eid+1 = 1
{ {
"PlanId": "SELECT_CACHE_RESULT", "PlanId": "PASS_SELECT",
"Reason": "WHERE", "Reason": "WHERE",
"TableName": "a", "TableName": "a",
"FieldQuery": "select * from a where 1 != 1", "FieldQuery": "select * from a where 1 != 1",
"FullQuery": "select * from a where eid+1 = 1 limit :_vtMaxResultSize", "FullQuery": "select * from a where eid+1 = 1 limit :_vtMaxResultSize",
"OuterQuery": "select eid, id, name, foo from a where eid+1 = 1 limit :_vtMaxResultSize", "OuterQuery": null,
"Subquery": null, "Subquery": null,
"IndexUsed": "", "IndexUsed": "",
"ColumnNumbers": [ "ColumnNumbers": [
@ -350,12 +350,12 @@ select * from a where eid+1 = 1
# complex where (non-value operand) # complex where (non-value operand)
select * from a where eid = id select * from a where eid = id
{ {
"PlanId": "SELECT_CACHE_RESULT", "PlanId": "PASS_SELECT",
"Reason": "WHERE", "Reason": "WHERE",
"TableName": "a", "TableName": "a",
"FieldQuery": "select * from a where 1 != 1", "FieldQuery": "select * from a where 1 != 1",
"FullQuery": "select * from a where eid = id limit :_vtMaxResultSize", "FullQuery": "select * from a where eid = id limit :_vtMaxResultSize",
"OuterQuery": "select eid, id, name, foo from a where eid = id limit :_vtMaxResultSize", "OuterQuery": null,
"Subquery": null, "Subquery": null,
"IndexUsed": "", "IndexUsed": "",
"ColumnNumbers": [ "ColumnNumbers": [
@ -374,12 +374,12 @@ select * from a where eid = id
# and # and
select * from a where eid=1 and foo='b' select * from a where eid=1 and foo='b'
{ {
"PlanId": "SELECT_CACHE_RESULT", "PlanId": "PASS_SELECT",
"Reason": "PKINDEX", "Reason": "PKINDEX",
"TableName": "a", "TableName": "a",
"FieldQuery": "select * from a where 1 != 1", "FieldQuery": "select * from a where 1 != 1",
"FullQuery": "select * from a where eid = 1 and foo = 'b' limit :_vtMaxResultSize", "FullQuery": "select * from a where eid = 1 and foo = 'b' limit :_vtMaxResultSize",
"OuterQuery": "select eid, id, name, foo from a where eid = 1 and foo = 'b' limit :_vtMaxResultSize", "OuterQuery": null,
"Subquery": null, "Subquery": null,
"IndexUsed": "PRIMARY", "IndexUsed": "PRIMARY",
"ColumnNumbers": [ "ColumnNumbers": [
@ -562,12 +562,12 @@ select * from a where eid=1 and id in (1)
# double pk IN # double pk IN
select * from a where eid in (1) and id in (1, 2) select * from a where eid in (1) and id in (1, 2)
{ {
"PlanId": "SELECT_CACHE_RESULT", "PlanId": "PASS_SELECT",
"Reason": "WHERE", "Reason": "WHERE",
"TableName": "a", "TableName": "a",
"FieldQuery": "select * from a where 1 != 1", "FieldQuery": "select * from a where 1 != 1",
"FullQuery": "select * from a where eid in (1) and id in (1, 2) limit :_vtMaxResultSize", "FullQuery": "select * from a where eid in (1) and id in (1, 2) limit :_vtMaxResultSize",
"OuterQuery": "select eid, id, name, foo from a where eid in (1) and id in (1, 2) limit :_vtMaxResultSize", "OuterQuery": null,
"Subquery": null, "Subquery": null,
"IndexUsed": "", "IndexUsed": "",
"ColumnNumbers": [ "ColumnNumbers": [
@ -586,12 +586,12 @@ select * from a where eid in (1) and id in (1, 2)
# double pk IN 2 # double pk IN 2
select * from a where eid in (1, 2) and id in (1, 2) select * from a where eid in (1, 2) and id in (1, 2)
{ {
"PlanId": "SELECT_CACHE_RESULT", "PlanId": "PASS_SELECT",
"Reason": "WHERE", "Reason": "WHERE",
"TableName": "a", "TableName": "a",
"FieldQuery": "select * from a where 1 != 1", "FieldQuery": "select * from a where 1 != 1",
"FullQuery": "select * from a where eid in (1, 2) and id in (1, 2) limit :_vtMaxResultSize", "FullQuery": "select * from a where eid in (1, 2) and id in (1, 2) limit :_vtMaxResultSize",
"OuterQuery": "select eid, id, name, foo from a where eid in (1, 2) and id in (1, 2) limit :_vtMaxResultSize", "OuterQuery": null,
"Subquery": null, "Subquery": null,
"IndexUsed": "", "IndexUsed": "",
"ColumnNumbers": [ "ColumnNumbers": [
@ -672,12 +672,12 @@ select * from a where eid=1 and id in (:a)
# inequality on pk columns # inequality on pk columns
select * from a where eid=1 and id>1 select * from a where eid=1 and id>1
{ {
"PlanId": "SELECT_CACHE_RESULT", "PlanId": "PASS_SELECT",
"Reason": "PKINDEX", "Reason": "PKINDEX",
"TableName": "a", "TableName": "a",
"FieldQuery": "select * from a where 1 != 1", "FieldQuery": "select * from a where 1 != 1",
"FullQuery": "select * from a where eid = 1 and id \u003e 1 limit :_vtMaxResultSize", "FullQuery": "select * from a where eid = 1 and id \u003e 1 limit :_vtMaxResultSize",
"OuterQuery": "select eid, id, name, foo from a where eid = 1 and id \u003e 1 limit :_vtMaxResultSize", "OuterQuery": null,
"Subquery": null, "Subquery": null,
"IndexUsed": "PRIMARY", "IndexUsed": "PRIMARY",
"ColumnNumbers": [ "ColumnNumbers": [
@ -744,12 +744,12 @@ select * from a where eid=1 and name='foo' limit 10
# no index match # no index match
select * from a where foo='bar' select * from a where foo='bar'
{ {
"PlanId":"SELECT_CACHE_RESULT", "PlanId":"PASS_SELECT",
"Reason":"NOINDEX_MATCH", "Reason":"NOINDEX_MATCH",
"TableName":"a", "TableName":"a",
"FieldQuery": "select * from a where 1 != 1", "FieldQuery": "select * from a where 1 != 1",
"FullQuery":"select * from a where foo = 'bar' limit :_vtMaxResultSize", "FullQuery":"select * from a where foo = 'bar' limit :_vtMaxResultSize",
"OuterQuery":"select eid, id, name, foo from a where foo = 'bar' limit :_vtMaxResultSize", "OuterQuery": null,
"Subquery":null, "Subquery":null,
"IndexUsed":"", "IndexUsed":"",
"ColumnNumbers":[ "ColumnNumbers":[
@ -864,12 +864,12 @@ select * from a where eid in (1, 2) and name='foo'
# non-pk IN non-value operand # non-pk IN non-value operand
select * from a where eid in (1, id) and name='foo' select * from a where eid in (1, id) and name='foo'
{ {
"PlanId": "SELECT_CACHE_RESULT", "PlanId": "PASS_SELECT",
"Reason": "WHERE", "Reason": "WHERE",
"TableName": "a", "TableName": "a",
"FieldQuery": "select * from a where 1 != 1", "FieldQuery": "select * from a where 1 != 1",
"FullQuery": "select * from a where eid in (1, id) and name = 'foo' limit :_vtMaxResultSize", "FullQuery": "select * from a where eid in (1, id) and name = 'foo' limit :_vtMaxResultSize",
"OuterQuery": "select eid, id, name, foo from a where eid in (1, id) and name = 'foo' limit :_vtMaxResultSize", "OuterQuery": null,
"Subquery": null, "Subquery": null,
"IndexUsed": "", "IndexUsed": "",
"ColumnNumbers": [ "ColumnNumbers": [
@ -912,12 +912,12 @@ select * from a where eid between 1 and 2 and name='foo'
# order by (pk) # order by (pk)
select * from a where eid=1 and id=1 order by name select * from a where eid=1 and id=1 order by name
{ {
"PlanId": "SELECT_CACHE_RESULT", "PlanId": "PASS_SELECT",
"Reason": "PKINDEX", "Reason": "PKINDEX",
"TableName": "a", "TableName": "a",
"FieldQuery": "select * from a where 1 != 1", "FieldQuery": "select * from a where 1 != 1",
"FullQuery": "select * from a where eid = 1 and id = 1 order by name asc limit :_vtMaxResultSize", "FullQuery": "select * from a where eid = 1 and id = 1 order by name asc limit :_vtMaxResultSize",
"OuterQuery": "select eid, id, name, foo from a where eid = 1 and id = 1 order by name asc limit :_vtMaxResultSize", "OuterQuery": null,
"Subquery": null, "Subquery": null,
"IndexUsed": "PRIMARY", "IndexUsed": "PRIMARY",
"ColumnNumbers": [ "ColumnNumbers": [
@ -984,12 +984,12 @@ select * from a where name='foo'
# index override # index override
select * from a use index(a_name) where eid = 1 select * from a use index(a_name) where eid = 1
{ {
"PlanId": "SELECT_CACHE_RESULT", "PlanId": "PASS_SELECT",
"Reason": "HAS_HINTS", "Reason": "HAS_HINTS",
"TableName": "a", "TableName": "a",
"FieldQuery": "select * from a use index (a_name) where 1 != 1", "FieldQuery": "select * from a use index (a_name) where 1 != 1",
"FullQuery": "select * from a use index (a_name) where eid = 1 limit :_vtMaxResultSize", "FullQuery": "select * from a use index (a_name) where eid = 1 limit :_vtMaxResultSize",
"OuterQuery": "select eid, id, name, foo from a use index (a_name) where eid = 1 limit :_vtMaxResultSize", "OuterQuery": null,
"Subquery": null, "Subquery": null,
"IndexUsed": "", "IndexUsed": "",
"ColumnNumbers": [ "ColumnNumbers": [

Просмотреть файл

@ -202,8 +202,19 @@ func (qe *QueryEngine) Execute(logStats *sqlQueryStats, query *proto.Query) (rep
// cheap hack: strip trailing comment into a special bind var // cheap hack: strip trailing comment into a special bind var
stripTrailing(query) stripTrailing(query)
basePlan := qe.schemaInfo.GetPlan(logStats, query.Sql) basePlan := qe.schemaInfo.GetPlan(logStats, query.Sql)
planName := basePlan.PlanId.String()
logStats.PlanType = planName
defer func(start time.Time) {
duration := time.Now().Sub(start)
queryStats.Add(planName, duration)
if reply == nil {
basePlan.AddStats(1, duration, 0, 1)
} else {
basePlan.AddStats(1, duration, int64(len(reply.Rows)), 0)
}
}(time.Now())
if basePlan.PlanId == sqlparser.PLAN_DDL { if basePlan.PlanId == sqlparser.PLAN_DDL {
defer queryStats.Record("DDL", time.Now())
return qe.execDDL(logStats, query.Sql) return qe.execDDL(logStats, query.Sql)
} }
@ -221,40 +232,24 @@ func (qe *QueryEngine) Execute(logStats *sqlQueryStats, query *proto.Query) (rep
if plan.TableInfo != nil && plan.TableInfo.CacheType != 0 { if plan.TableInfo != nil && plan.TableInfo.CacheType != 0 {
panic(NewTabletError(FAIL, "DML too complex for cached table")) panic(NewTabletError(FAIL, "DML too complex for cached table"))
} }
logStats.PlanType = "PASS_DML"
defer queryStats.Record("PASS_DML", time.Now())
reply = qe.directFetch(logStats, conn, plan.FullQuery, plan.BindVars, nil, nil) reply = qe.directFetch(logStats, conn, plan.FullQuery, plan.BindVars, nil, nil)
case sqlparser.PLAN_INSERT_PK: case sqlparser.PLAN_INSERT_PK:
logStats.PlanType = "INSERT_PK"
defer queryStats.Record("INSERT_PK", time.Now())
reply = qe.execInsertPK(logStats, conn, plan, invalidator) reply = qe.execInsertPK(logStats, conn, plan, invalidator)
case sqlparser.PLAN_INSERT_SUBQUERY: case sqlparser.PLAN_INSERT_SUBQUERY:
logStats.PlanType = "INSERT_SUBQUERY"
defer queryStats.Record("INSERT_SUBQUERY", time.Now())
reply = qe.execInsertSubquery(logStats, conn, plan, invalidator) reply = qe.execInsertSubquery(logStats, conn, plan, invalidator)
case sqlparser.PLAN_DML_PK: case sqlparser.PLAN_DML_PK:
logStats.PlanType = "DML_PK"
defer queryStats.Record("DML_PK", time.Now())
reply = qe.execDMLPK(logStats, conn, plan, invalidator) reply = qe.execDMLPK(logStats, conn, plan, invalidator)
case sqlparser.PLAN_DML_SUBQUERY: case sqlparser.PLAN_DML_SUBQUERY:
logStats.PlanType = "DML_SUBQUERY"
defer queryStats.Record("DML_SUBQUERY", time.Now())
reply = qe.execDMLSubquery(logStats, conn, plan, invalidator) reply = qe.execDMLSubquery(logStats, conn, plan, invalidator)
default: // select or set in a transaction, just count as select default: // select or set in a transaction, just count as select
logStats.PlanType = "PASS_SELECT"
defer queryStats.Record("PASS_SELECT", time.Now())
reply = qe.execDirect(logStats, plan, conn) reply = qe.execDirect(logStats, plan, conn)
} }
} else if plan.ConnectionId != 0 { } else if plan.ConnectionId != 0 {
conn := qe.reservedPool.Get(plan.ConnectionId) conn := qe.reservedPool.Get(plan.ConnectionId)
defer conn.Recycle() defer conn.Recycle()
if plan.PlanId.IsSelect() { if plan.PlanId.IsSelect() {
logStats.PlanType = "PASS_SELECT"
defer queryStats.Record("PASS_SELECT", time.Now())
reply = qe.execDirect(logStats, plan, conn) reply = qe.execDirect(logStats, plan, conn)
} else if plan.PlanId == sqlparser.PLAN_SET { } else if plan.PlanId == sqlparser.PLAN_SET {
logStats.PlanType = "SET"
defer queryStats.Record("SET", time.Now())
reply = qe.directFetch(logStats, conn, plan.FullQuery, plan.BindVars, nil, nil) reply = qe.directFetch(logStats, conn, plan.FullQuery, plan.BindVars, nil, nil)
} else { } else {
panic(NewTabletError(FAIL, "DMLs not allowed outside of transactions")) panic(NewTabletError(FAIL, "DMLs not allowed outside of transactions"))
@ -265,29 +260,16 @@ func (qe *QueryEngine) Execute(logStats *sqlQueryStats, query *proto.Query) (rep
if plan.Reason == sqlparser.REASON_FOR_UPDATE { if plan.Reason == sqlparser.REASON_FOR_UPDATE {
panic(NewTabletError(FAIL, "Disallowed outside transaction")) panic(NewTabletError(FAIL, "Disallowed outside transaction"))
} }
logStats.PlanType = "PASS_SELECT"
defer queryStats.Record("PASS_SELECT", time.Now())
reply = qe.execSelect(logStats, plan) reply = qe.execSelect(logStats, plan)
case sqlparser.PLAN_SELECT_PK: case sqlparser.PLAN_SELECT_PK:
logStats.PlanType = "SELECT_PK"
defer queryStats.Record("SELECT_PK", time.Now())
reply = qe.execPK(logStats, plan) reply = qe.execPK(logStats, plan)
case sqlparser.PLAN_SELECT_SUBQUERY: case sqlparser.PLAN_SELECT_SUBQUERY:
logStats.PlanType = "SELECT_SUBQUERY"
defer queryStats.Record("SELECT_SUBQUERY", time.Now())
reply = qe.execSubquery(logStats, plan) reply = qe.execSubquery(logStats, plan)
case sqlparser.PLAN_SELECT_CACHE_RESULT:
logStats.PlanType = "SELECT_CACHE_RESULT"
defer queryStats.Record("SELECT_CACHE_RESULT", time.Now())
// It may not be worth caching the results. So, just pass through.
reply = qe.execSelect(logStats, plan)
case sqlparser.PLAN_SET: case sqlparser.PLAN_SET:
waitingForConnectionStart := time.Now() waitingForConnectionStart := time.Now()
conn := qe.connPool.Get() conn := qe.connPool.Get()
logStats.WaitingForConnection += time.Now().Sub(waitingForConnectionStart) logStats.WaitingForConnection += time.Now().Sub(waitingForConnectionStart)
defer conn.Recycle() defer conn.Recycle()
logStats.PlanType = "SET"
defer queryStats.Record("SET", time.Now())
reply = qe.execSet(logStats, conn, plan) reply = qe.execSet(logStats, conn, plan)
default: default:
panic(NewTabletError(FAIL, "DMLs not allowed outside of transactions")) panic(NewTabletError(FAIL, "DMLs not allowed outside of transactions"))

Просмотреть файл

@ -5,7 +5,11 @@
package tabletserver package tabletserver
import ( import (
"net/http"
mproto "code.google.com/p/vitess/go/mysql/proto"
"code.google.com/p/vitess/go/relog" "code.google.com/p/vitess/go/relog"
rpcproto "code.google.com/p/vitess/go/rpcwrap/proto"
"code.google.com/p/vitess/go/vt/dbconfigs" "code.google.com/p/vitess/go/vt/dbconfigs"
"code.google.com/p/vitess/go/vt/tabletserver/proto" "code.google.com/p/vitess/go/vt/tabletserver/proto"
) )
@ -33,6 +37,7 @@ func RegisterQueryService(config Config) {
} }
SqlQueryRpcService = NewSqlQuery(config) SqlQueryRpcService = NewSqlQuery(config)
proto.RegisterAuthenticated(SqlQueryRpcService) proto.RegisterAuthenticated(SqlQueryRpcService)
http.HandleFunc("/debug/health", healthCheck)
} }
// AllowQueries can take an indefinite amount of time to return because // AllowQueries can take an indefinite amount of time to return because
@ -79,3 +84,16 @@ func InvalidateForDml(cacheInvalidate *proto.CacheInvalidate) {
func InvalidateForDDL(ddlInvalidate *proto.DDLInvalidate) { func InvalidateForDDL(ddlInvalidate *proto.DDLInvalidate) {
SqlQueryRpcService.qe.InvalidateForDDL(ddlInvalidate) SqlQueryRpcService.qe.InvalidateForDDL(ddlInvalidate)
} }
func healthCheck(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
err := SqlQueryRpcService.Execute(
new(rpcproto.Context),
&proto.Query{Sql: "select 1 from dual", SessionId: SqlQueryRpcService.sessionId},
new(mproto.QueryResult),
)
if err != nil {
w.Write([]byte("notok"))
}
w.Write([]byte("ok"))
}

Просмотреть файл

@ -29,12 +29,37 @@ type ExecPlan struct {
*sqlparser.ExecPlan *sqlparser.ExecPlan
TableInfo *TableInfo TableInfo *TableInfo
Fields []mproto.Field Fields []mproto.Field
mu sync.Mutex
QueryCount int64
Time time.Duration
RowCount int64
ErrorCount int64
} }
func (*ExecPlan) Size() int { func (*ExecPlan) Size() int {
return 1 return 1
} }
func (ep *ExecPlan) AddStats(queryCount int64, duration time.Duration, rowCount, errorCount int64) {
ep.mu.Lock()
ep.QueryCount += queryCount
ep.Time += duration
ep.RowCount += rowCount
ep.ErrorCount += errorCount
ep.mu.Unlock()
}
func (ep *ExecPlan) Stats() (queryCount int64, duration time.Duration, rowCount, errorCount int64) {
ep.mu.Lock()
queryCount = ep.QueryCount
duration = ep.Time
rowCount = ep.RowCount
errorCount = ep.ErrorCount
ep.mu.Unlock()
return
}
type SchemaInfo struct { type SchemaInfo struct {
mu sync.Mutex mu sync.Mutex
tables map[string]*TableInfo tables map[string]*TableInfo
@ -216,7 +241,7 @@ func (si *SchemaInfo) GetPlan(logStats *sqlQueryStats, sql string) (plan *ExecPl
if err != nil { if err != nil {
panic(NewTabletError(FAIL, "%s", err)) panic(NewTabletError(FAIL, "%s", err))
} }
plan = &ExecPlan{splan, tableInfo, nil} plan = &ExecPlan{ExecPlan: splan, TableInfo: tableInfo}
if plan.PlanId.IsSelect() { if plan.PlanId.IsSelect() {
if plan.FieldQuery == nil { if plan.FieldQuery == nil {
relog.Warning("Cannot cache field info: %s", sql) relog.Warning("Cannot cache field info: %s", sql)
@ -277,28 +302,53 @@ func (si *SchemaInfo) SetReloadTime(reloadTime time.Duration) {
si.ticks.SetInterval(reloadTime) si.ticks.SetInterval(reloadTime)
} }
type perQueryStats struct {
Query string
Table string
Plan sqlparser.PlanType
QueryCount int64
Time time.Duration
RowCount int64
ErrorCount int64
}
func (si *SchemaInfo) ServeHTTP(response http.ResponseWriter, request *http.Request) { func (si *SchemaInfo) ServeHTTP(response http.ResponseWriter, request *http.Request) {
if request.URL.Path == "/debug/schema/query_cache" { if request.URL.Path == "/debug/schema/query_plans" {
keys := si.queries.Keys() keys := si.queries.Keys()
response.Header().Set("Content-Type", "text/plain") response.Header().Set("Content-Type", "text/plain")
if keys == nil {
response.Write([]byte("empty\n"))
return
}
response.Write([]byte(fmt.Sprintf("Length: %d\n", len(keys)))) response.Write([]byte(fmt.Sprintf("Length: %d\n", len(keys))))
for _, v := range keys { for _, v := range keys {
response.Write([]byte(fmt.Sprintf("%s\n", v))) response.Write([]byte(fmt.Sprintf("%#v\n", v)))
if plan := si.getQuery(v); plan != nil { if plan := si.getQuery(v); plan != nil {
if b, err := json.MarshalIndent(plan.ExecPlan, "", " "); err != nil { if b, err := json.MarshalIndent(plan.ExecPlan, "", " "); err != nil {
response.Write([]byte(err.Error())) response.Write([]byte(err.Error()))
} else { } else {
response.Write(b) response.Write(b)
response.Write(([]byte)("\n\n"))
} }
response.Write(([]byte)("\n\n"))
} }
} }
} else if request.URL.Path == "/debug/schema/query_stats" {
keys := si.queries.Keys()
response.Header().Set("Content-Type", "application/json; charset=utf-8")
qstats := make([]perQueryStats, 0, len(keys))
for _, v := range keys {
if plan := si.getQuery(v); plan != nil {
var pqstats perQueryStats
pqstats.Query = unicoded(v)
pqstats.Table = plan.TableName
pqstats.Plan = plan.PlanId
pqstats.QueryCount, pqstats.Time, pqstats.RowCount, pqstats.ErrorCount = plan.Stats()
qstats = append(qstats, pqstats)
}
}
if b, err := json.MarshalIndent(qstats, "", " "); err != nil {
response.Write([]byte(err.Error()))
} else {
response.Write(b)
}
} else if request.URL.Path == "/debug/schema/tables" { } else if request.URL.Path == "/debug/schema/tables" {
response.Header().Set("Content-Type", "text/plain") response.Header().Set("Content-Type", "application/json; charset=utf-8")
si.mu.Lock() si.mu.Lock()
tstats := make(map[string]struct{ hits, absent, misses, invalidations int64 }) tstats := make(map[string]struct{ hits, absent, misses, invalidations int64 })
var temp, totals struct{ hits, absent, misses, invalidations int64 } var temp, totals struct{ hits, absent, misses, invalidations int64 }
@ -333,3 +383,13 @@ func applyFieldFilter(columnNumbers []int, input []mproto.Field) (output []mprot
} }
return output return output
} }
// unicoded returns a valid UTF-8 string that json won't reject
func unicoded(in string) (out string) {
for i, v := range in {
if v == 0xFFFD {
return in[:i]
}
}
return in
}

Просмотреть файл

@ -78,9 +78,9 @@ cases = [
# (1.foo, 2.bar, 2.foo) # (1.foo, 2.bar, 2.foo)
MultiCase( MultiCase(
"SELECT_CACHE_RESULT", # it currently doesn't cache "PASS_SELECT", # it currently doesn't cache
['select * from vtocc_cached', ['select * from vtocc_cached',
Case(query_plan="SELECT_CACHE_RESULT", Case(query_plan="PASS_SELECT",
sql="select eid, bid, name, foo from vtocc_cached", sql="select eid, bid, name, foo from vtocc_cached",
rewritten=[ rewritten=[
"select eid, bid, name, foo from vtocc_cached where 1 != 1", "select eid, bid, name, foo from vtocc_cached where 1 != 1",

Просмотреть файл

@ -18,8 +18,7 @@ class TestNocache(framework.TestCase):
self.env.execute("begin") self.env.execute("begin")
binary_data = '\x00\'\"\b\n\r\t\x1a\\\x00\x0f\xf0\xff' binary_data = '\x00\'\"\b\n\r\t\x1a\\\x00\x0f\xf0\xff'
self.env.execute("insert into vtocc_test values(4, null, null, '\\0\\'\\\"\\b\\n\\r\\t\\Z\\\\\x00\x0f\xf0\xff')") self.env.execute("insert into vtocc_test values(4, null, null, '\\0\\'\\\"\\b\\n\\r\\t\\Z\\\\\x00\x0f\xf0\xff')")
bvar = {} bvar = {'bindata': binary_data}
bvar['bindata'] = binary_data
self.env.execute("insert into vtocc_test values(5, null, null, %(bindata)s)", bvar) self.env.execute("insert into vtocc_test values(5, null, null, %(bindata)s)", bvar)
self.env.execute("commit") self.env.execute("commit")
cu = self.env.execute("select * from vtocc_test where intval=4") cu = self.env.execute("select * from vtocc_test where intval=4")
@ -103,8 +102,7 @@ class TestNocache(framework.TestCase):
def test_trailing_comment(self): def test_trailing_comment(self):
vstart = self.env.debug_vars() vstart = self.env.debug_vars()
bv={} bv={'ival': 1}
bv["ival"] = 1
self.env.execute("select * from vtocc_test where intval=%(ival)s", bv) self.env.execute("select * from vtocc_test where intval=%(ival)s", bv)
vend = self.env.debug_vars() vend = self.env.debug_vars()
self.assertEqual(vstart.mget("Voltron.QueryCache.Length", 0)+1, vend.Voltron.QueryCache.Length) self.assertEqual(vstart.mget("Voltron.QueryCache.Length", 0)+1, vend.Voltron.QueryCache.Length)
@ -187,10 +185,8 @@ class TestNocache(framework.TestCase):
def test_query_cache(self): def test_query_cache(self):
self.env.execute("set vt_query_cache_size=1") self.env.execute("set vt_query_cache_size=1")
bv={} bv={'ival1': 1, 'ival2': 1}
bv["ival1"] = 1
self.env.execute("select * from vtocc_test where intval=%(ival1)s", bv) self.env.execute("select * from vtocc_test where intval=%(ival1)s", bv)
bv["ival2"] = 1
self.env.execute("select * from vtocc_test where intval=%(ival2)s", bv) self.env.execute("select * from vtocc_test where intval=%(ival2)s", bv)
vend = self.env.debug_vars() vend = self.env.debug_vars()
self.assertEqual(vend.Voltron.QueryCache.Length, 1) self.assertEqual(vend.Voltron.QueryCache.Length, 1)
@ -306,11 +302,10 @@ class TestNocache(framework.TestCase):
self.assertEqual(results, [([(1L, 2L, 'bcde', 'fghi')], 1, 0, [('eid', 8), ('id', 3), ('name', 253), ('foo', 253)]), ([(1L, 2L)], 1, 0, [('eid', 8), ('id', 3)])]) self.assertEqual(results, [([(1L, 2L, 'bcde', 'fghi')], 1, 0, [('eid', 8), ('id', 3), ('name', 253), ('foo', 253)]), ([(1L, 2L)], 1, 0, [('eid', 8), ('id', 3)])])
def test_bind_in_select(self): def test_bind_in_select(self):
bv = {} bv = {'bv': 1}
bv['bv'] = 1
cu = self.env.execute('select %(bv)s from vtocc_test', bv) cu = self.env.execute('select %(bv)s from vtocc_test', bv)
self.assertEqual(cu.description, [('1', 8)]) self.assertEqual(cu.description, [('1', 8)])
bv['bv'] = 'abcd' bv = {'bv': 'abcd'}
cu = self.env.execute('select %(bv)s from vtocc_test', bv) cu = self.env.execute('select %(bv)s from vtocc_test', bv)
self.assertEqual(cu.description, [('abcd', 253)]) self.assertEqual(cu.description, [('abcd', 253)])
@ -341,6 +336,21 @@ class TestNocache(framework.TestCase):
self.env.execute("delete from vtocc_strings") self.env.execute("delete from vtocc_strings")
self.env.execute("commit") self.env.execute("commit")
def test_health(self):
self.assertEqual(self.env.health(), "ok")
def test_query_stats(self):
bv = {'eid': 1}
self.env.execute("select eid as query_stats from vtocc_a where eid = %(eid)s", bv)
self._verify_query_stats(self.env.query_stats(), "select eid as query_stats from vtocc_a where eid = :eid", "vtocc_a", "PASS_SELECT", 1, 2, 0)
try:
self.env.execute("select eid as query_stats from vtocc_a where dontexist(eid) = %(eid)s", bv)
except (db.MySQLErrors.DatabaseError, db.dbexceptions.OperationalError), e:
pass
else:
self.fail("Did not receive exception: " + query)
self._verify_query_stats(self.env.query_stats(), "select eid as query_stats from vtocc_a where dontexist(eid) = :eid", "vtocc_a", "PASS_SELECT", 1, 0, 1)
def _verify_mismatch(self, query, bindvars=None): def _verify_mismatch(self, query, bindvars=None):
self._verify_error(query, bindvars, "error: Type mismatch") self._verify_error(query, bindvars, "error: Type mismatch")
@ -355,6 +365,19 @@ class TestNocache(framework.TestCase):
finally: finally:
self.env.execute("rollback") self.env.execute("rollback")
def _verify_query_stats(self, query_stats, query, table, plan, count, rows, errors):
for stat in query_stats:
if stat["Query"] != query:
continue
self.assertEqual(stat["Table"], table)
self.assertEqual(stat["Plan"], plan)
self.assertEqual(stat["QueryCount"], count)
self.assertEqual(stat["RowCount"], rows)
self.assertEqual(stat["ErrorCount"], errors)
self.assertTrue(stat["Time"] > 0)
return
self.fail("query %s not found" % query)
def test_sqls(self): def test_sqls(self):
error_count = self.env.run_cases(nocache_cases.cases) error_count = self.env.run_cases(nocache_cases.cases)
if error_count != 0: if error_count != 0:

Просмотреть файл

@ -49,6 +49,12 @@ class TestEnv(object):
def table_stats(self): def table_stats(self):
return framework.MultiDict(json.load(urllib2.urlopen("http://localhost:9461/debug/schema/tables"))) return framework.MultiDict(json.load(urllib2.urlopen("http://localhost:9461/debug/schema/tables")))
def query_stats(self):
return json.load(urllib2.urlopen("http://localhost:9461/debug/schema/query_stats"))
def health(self):
return urllib2.urlopen("http://localhost:9461/debug/health").read()
def check_streamlog(self, cases, log): def check_streamlog(self, cases, log):
error_count = 0 error_count = 0