Merge branch 'master' into replication

This commit is contained in:
Alain Jobart 2014-05-16 12:49:14 -07:00
Родитель a61534618a f1fa61f918
Коммит fcd2db129b
6 изменённых файлов: 152 добавлений и 169 удалений

Просмотреть файл

@ -21,42 +21,29 @@ var droppedMessages = stats.NewCounters("StreamlogDroppedMessages")
// Subscribers can use channels or HTTP.
type StreamLogger struct {
name string
dataQueue chan Formatter
dataQueue chan interface{}
mu sync.Mutex
subscribed map[chan string]url.Values
subscribed map[chan interface{}]struct{}
// size is used to check if there are any subscriptions. Keep
// it atomically in sync with the size of subscribed.
size sync2.AtomicUint32
}
// Formatter defines the interface that messages have to satisfy
// to be broadcast through StreamLogger.
type Formatter interface {
Format(url.Values) string
}
// New returns a new StreamLogger with a buffer that can contain size
// messages. Any messages sent to it will be available at url.
func New(name string, size int) *StreamLogger {
logger := &StreamLogger{
name: name,
dataQueue: make(chan Formatter, size),
subscribed: make(map[chan string]url.Values),
dataQueue: make(chan interface{}, size),
subscribed: make(map[chan interface{}]struct{}),
}
go logger.stream()
return logger
}
// ServeLogs registers the URL on which messages will be broadcast.
// It is safe to register multiple URLs for the same StreamLogger.
func (logger *StreamLogger) ServeLogs(url string) {
http.Handle(url, logger)
log.Infof("Streaming logs from %s at %v.", logger.Name(), url)
}
// Send sends message to all the writers subscribed to logger. Calling
// Send does not block.
func (logger *StreamLogger) Send(message Formatter) {
func (logger *StreamLogger) Send(message interface{}) {
if logger.size.Get() == 0 {
// There are no subscribers, do nothing.
return
@ -70,18 +57,19 @@ func (logger *StreamLogger) Send(message Formatter) {
// Subscribe returns a channel which can be used to listen
// for messages.
func (logger *StreamLogger) Subscribe(params url.Values) chan string {
func (logger *StreamLogger) Subscribe() chan interface{} {
logger.mu.Lock()
defer logger.mu.Unlock()
ch := make(chan string, 1)
logger.subscribed[ch] = params
ch := make(chan interface{}, 1)
var empty struct{}
logger.subscribed[ch] = empty
logger.size.Set(uint32(len(logger.subscribed)))
return ch
}
// Unsubscribe removes the channel from the subscription.
func (logger *StreamLogger) Unsubscribe(ch chan string) {
func (logger *StreamLogger) Unsubscribe(ch chan interface{}) {
logger.mu.Lock()
defer logger.mu.Unlock()
@ -97,14 +85,13 @@ func (logger *StreamLogger) stream() {
}
}
func (logger *StreamLogger) transmit(message Formatter) {
func (logger *StreamLogger) transmit(message interface{}) {
logger.mu.Lock()
defer logger.mu.Unlock()
for ch, params := range logger.subscribed {
messageString := message.Format(params)
for ch := range logger.subscribed {
select {
case ch <- messageString:
case ch <- message:
default:
droppedMessages.Add(logger.name, 1)
}
@ -116,18 +103,22 @@ func (logger *StreamLogger) Name() string {
return logger.name
}
// ServeHTTP is the http handler for StreamLogger.
func (logger *StreamLogger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
ch := logger.Subscribe(r.Form)
defer logger.Unsubscribe(ch)
for messageString := range ch {
if _, err := io.WriteString(w, messageString); err != nil {
return
// ServeLogs registers the URL on which messages will be broadcast.
// It is safe to register multiple URLs for the same StreamLogger.
func (logger *StreamLogger) ServeLogs(url string, messageFmt func(url.Values, interface{}) string) {
http.HandleFunc(url, func(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
w.(http.Flusher).Flush()
}
ch := logger.Subscribe()
defer logger.Unsubscribe(ch)
for message := range ch {
if _, err := io.WriteString(w, messageFmt(r.Form, message)); err != nil {
return
}
w.(http.Flusher).Flush()
}
})
log.Infof("Streaming logs from %s at %v.", logger.Name(), url)
}

Просмотреть файл

@ -111,7 +111,7 @@ func (axp *ActiveTxPool) SafeCommit(transactionId int64) (invalidList map[string
defer handleError(&err, nil)
conn := axp.Get(transactionId)
defer conn.discard(TX_COMMIT)
axp.txStats.Add("Completed", time.Now().Sub(conn.startTime))
axp.txStats.Add("Completed", time.Now().Sub(conn.StartTime))
if _, err = conn.ExecuteFetch(COMMIT, 1, false); err != nil {
conn.Close()
return conn.dirtyTables, NewTabletErrorSql(FAIL, err)
@ -122,7 +122,7 @@ func (axp *ActiveTxPool) SafeCommit(transactionId int64) (invalidList map[string
func (axp *ActiveTxPool) Rollback(transactionId int64) {
conn := axp.Get(transactionId)
defer conn.discard(TX_ROLLBACK)
axp.txStats.Add("Aborted", time.Now().Sub(conn.startTime))
axp.txStats.Add("Aborted", time.Now().Sub(conn.StartTime))
if _, err := conn.ExecuteFetch(ROLLBACK, 1, false); err != nil {
conn.Close()
panic(NewTabletErrorSql(FAIL, err))
@ -158,24 +158,24 @@ func (axp *ActiveTxPool) Stats() (size int64, timeout time.Duration) {
type TxConnection struct {
PoolConnection
transactionId int64
TransactionID int64
pool *ActiveTxPool
inUse bool
startTime time.Time
endTime time.Time
StartTime time.Time
EndTime time.Time
dirtyTables map[string]DirtyKeys
queries []string
conclusion string
Queries []string
Conclusion string
}
func newTxConnection(conn PoolConnection, transactionId int64, pool *ActiveTxPool) *TxConnection {
return &TxConnection{
PoolConnection: conn,
transactionId: transactionId,
TransactionID: transactionId,
pool: pool,
startTime: time.Now(),
StartTime: time.Now(),
dirtyTables: make(map[string]DirtyKeys),
queries: make([]string, 0, 8),
Queries: make([]string, 0, 8),
}
}
@ -192,18 +192,18 @@ func (txc *TxConnection) Recycle() {
if txc.IsClosed() {
txc.discard(TX_CLOSE)
} else {
txc.pool.pool.Put(txc.transactionId)
txc.pool.pool.Put(txc.TransactionID)
}
}
func (txc *TxConnection) RecordQuery(query string) {
txc.queries = append(txc.queries, query)
txc.Queries = append(txc.Queries, query)
}
func (txc *TxConnection) discard(conclusion string) {
txc.conclusion = conclusion
txc.endTime = time.Now()
txc.pool.pool.Unregister(txc.transactionId)
txc.Conclusion = conclusion
txc.EndTime = time.Now()
txc.pool.pool.Unregister(txc.TransactionID)
txc.PoolConnection.Recycle()
// Ensure PoolConnection won't be accessed after Recycle.
txc.PoolConnection = nil
@ -213,12 +213,12 @@ func (txc *TxConnection) discard(conclusion string) {
func (txc *TxConnection) Format(params url.Values) string {
return fmt.Sprintf(
"%v\t%v\t%v\t%.6f\t%v\t%v\t\n",
txc.transactionId,
txc.startTime.Format(time.StampMicro),
txc.endTime.Format(time.StampMicro),
txc.endTime.Sub(txc.startTime).Seconds(),
txc.conclusion,
strings.Join(txc.queries, ";"),
txc.TransactionID,
txc.StartTime.Format(time.StampMicro),
txc.EndTime.Format(time.StampMicro),
txc.EndTime.Sub(txc.StartTime).Seconds(),
txc.Conclusion,
strings.Join(txc.Queries, ";"),
)
}

Просмотреть файл

@ -39,6 +39,9 @@ func startHTMLTable(w http.ResponseWriter) {
table.gridtable tr.high {
background-color: #ff3300;
}
table.gridtable tr.error {
background-color: #00ddff;
}
table.gridtable td {
border-width: 1px;
padding: 4px;

Просмотреть файл

@ -6,12 +6,15 @@ package tabletserver
import (
"flag"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
log "github.com/golang/glog"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/streamlog"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/tabletserver/proto"
@ -203,11 +206,25 @@ func healthCheck(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ok"))
}
func buildFmter(logger *streamlog.StreamLogger) func(url.Values, interface{}) string {
type formatter interface {
Format(url.Values) string
}
return func(params url.Values, val interface{}) string {
fmter, ok := val.(formatter)
if !ok {
return fmt.Sprintf("Error: unexpected value of type %T in %s!", val, logger.Name)
}
return fmter.Format(params)
}
}
// InitQueryService registers the query service, after loading any
// necessary config files. It also starts any relevant streaming logs.
func InitQueryService() {
SqlQueryLogger.ServeLogs(*queryLogHandler)
TxLogger.ServeLogs(*txLogHandler)
SqlQueryLogger.ServeLogs(*queryLogHandler, buildFmter(SqlQueryLogger))
TxLogger.ServeLogs(*txLogHandler, buildFmter(TxLogger))
RegisterQueryService()
}

Просмотреть файл

@ -6,11 +6,13 @@ package tabletserver
import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"text/template"
"time"
log "github.com/golang/glog"
)
var (
@ -28,57 +30,41 @@ var (
<th>SQL</th>
<th>Queries</th>
<th>Sources</th>
<th>Rows</th>
<th>Hits</th>
<th>Misses</th>
<th>Absent</th>
<th>Invalidations</th>
<th>Response Size (Rows)</th>
<th>Cache Hits</th>
<th>Cache Misses</th>
<th>Cache Absent</th>
<th>Cache Invalidations</th>
</tr>
`)
querylogzTmpl = template.Must(template.New("example").Parse(`
<tr class="{{.Color}}">
querylogzFuncMap = template.FuncMap{
"stampMicro": func(t time.Time) string { return t.Format(time.StampMicro) },
"cssWrappable": wrappable,
"unquote": func(s string) string { return strings.Trim(s, "\"") },
}
querylogzTmpl = template.Must(template.New("example").Funcs(querylogzFuncMap).Parse(`
<tr class=".ColorLevel">
<td>{{.Method}}</td>
<td>{{.RemoteAddr}}</td>
<td>{{.Username}}</td>
<td>{{.Start}}</td>
<td>{{.End}}</td>
<td>{{.Duration}}</td>
<td>{{.MySQL}}</td>
<td>{{.Conn}}</td>
<td>{{.StartTime | stampMicro}}</td>
<td>{{.EndTime | stampMicro}}</td>
<td>{{.TotalTime.Seconds}}</td>
<td>{{.MysqlResponseTime.Seconds}}</td>
<td>{{.WaitingForConnection.Seconds}}</td>
<td>{{.PlanType}}</td>
<td>{{.Sql}}</td>
<td>{{.Queries}}</td>
<td>{{.Sources}}</td>
<td>{{.Rows}}</td>
<td>{{.Hits}}</td>
<td>{{.Misses}}</td>
<td>{{.Absent}}</td>
<td>{{.Invalidations}}</td>
<td>{{.OriginalSql | unquote | cssWrappable}}</td>
<td>{{.NumberOfQueries}}</td>
<td>{{.FmtQuerySources}}</td>
<td>{{.SizeOfResponse}}</td>
<td>{{.CacheHits}}</td>
<td>{{.CacheMisses}}</td>
<td>{{.CacheAbsent}}</td>
<td>{{.CacheInvalidations}}</td>
</tr>
`))
)
type querylogzRow struct {
Method string
RemoteAddr string
Username string
Start string
End string
Duration string
MySQL string
Conn string
PlanType string
Sql string
Queries string
Sources string
Rows string
Hits string
Misses string
Absent string
Invalidations string
Color string
}
func init() {
http.HandleFunc("/querylogz", querylogzHandler)
}
@ -86,7 +72,7 @@ func init() {
// querylogzHandler serves a human readable snapshot of the
// current query log.
func querylogzHandler(w http.ResponseWriter, r *http.Request) {
ch := SqlQueryLogger.Subscribe(nil)
ch := SqlQueryLogger.Subscribe()
defer SqlQueryLogger.Unsubscribe(ch)
startHTMLTable(w)
defer endHTMLTable(w)
@ -96,39 +82,28 @@ func querylogzHandler(w http.ResponseWriter, r *http.Request) {
for i := 0; i < 300; i++ {
select {
case out := <-ch:
strs := strings.Split(strings.Trim(out, "\n"), "\t")
if len(strs) < 19 {
querylogzTmpl.Execute(w, &querylogzRow{Method: fmt.Sprintf("Short: %d", len(strs))})
stats, ok := out.(*sqlQueryStats)
if !ok {
err := fmt.Errorf("Unexpected value in %s: %#v (expecting value of type %T)", TxLogger.Name, out, &sqlQueryStats{})
io.WriteString(w, `<tr class="error">`)
io.WriteString(w, err.Error())
io.WriteString(w, "</tr>")
log.Error(err)
continue
}
Value := &querylogzRow{
Method: strs[0],
RemoteAddr: strs[1],
Username: strs[2],
Start: strs[3],
End: strs[4],
Duration: strs[5],
MySQL: strs[12],
Conn: strs[13],
PlanType: strs[6],
Sql: wrappable(strings.Trim(strs[7], "\"")),
Queries: strs[9],
Sources: strs[11],
Rows: strs[14],
Hits: strs[15],
Misses: strs[16],
Absent: strs[17],
Invalidations: strs[18],
}
duration, _ := strconv.ParseFloat(Value.Duration, 64)
if duration < 0.01 {
Value.Color = "low"
} else if duration < 0.1 {
Value.Color = "medium"
var level string
if stats.TotalTime().Seconds() < 0.01 {
level = "low"
} else if stats.TotalTime().Seconds() < 0.1 {
level = "medium"
} else {
Value.Color = "high"
level = "high"
}
querylogzTmpl.Execute(w, Value)
tmplData := struct {
*sqlQueryStats
ColorLevel string
}{stats, level}
querylogzTmpl.Execute(w, tmplData)
case <-deadline:
return
}

Просмотреть файл

@ -7,10 +7,11 @@ package tabletserver
import (
"fmt"
"html/template"
"io"
"net/http"
"strconv"
"strings"
"time"
log "github.com/golang/glog"
)
var (
@ -26,30 +27,24 @@ var (
</tr>
</thead>
`)
txlogzTmpl = template.Must(template.New("example").Parse(`
<tr class="{{.Color}}">
<td>{{.Tid}}</td>
<td>{{.Start}}</td>
<td>{{.End}}</td>
txlogzFuncMap = template.FuncMap{
"stampMicro": func(t time.Time) string { return t.Format(time.StampMicro) },
}
txlogzTmpl = template.Must(template.New("example").Funcs(txlogzFuncMap).Parse(`
<tr class="{{.ColorLevel}}">
<td>{{.TransactionID}}</td>
<td>{{.StartTime | stampMicro}}</td>
<td>{{.EndTime | stampMicro}}</td>
<td>{{.Duration}}</td>
<td>{{.Decision}}</td>
<td>{{.Conclusion}}</td>
<td>
{{ range .Statements }}
{{ range .Queries }}
{{.}}<br>
{{ end}}
</td>
</tr>`))
)
type txlogzRow struct {
Tid string
Start, End string
Duration string
Decision string
Statements []string
Color string
}
func init() {
http.HandleFunc("/txlogz", txlogzHandler)
}
@ -57,7 +52,7 @@ func init() {
// txlogzHandler serves a human readable snapshot of the
// current transaction log.
func txlogzHandler(w http.ResponseWriter, r *http.Request) {
ch := TxLogger.Subscribe(nil)
ch := TxLogger.Subscribe()
defer TxLogger.Unsubscribe(ch)
startHTMLTable(w)
defer endHTMLTable(w)
@ -67,28 +62,30 @@ func txlogzHandler(w http.ResponseWriter, r *http.Request) {
for i := 0; i < 300; i++ {
select {
case out := <-ch:
strs := strings.Split(strings.Trim(out, "\n"), "\t")
if len(strs) < 6 {
txlogzTmpl.Execute(w, &txlogzRow{Tid: fmt.Sprintf("Short: %d", len(strs))})
txc, ok := out.(*TxConnection)
if !ok {
err := fmt.Errorf("Unexpected value in %s: %#v (expecting value of type %T)", TxLogger.Name, out, &TxConnection{})
io.WriteString(w, `<tr class="error">`)
io.WriteString(w, err.Error())
io.WriteString(w, "</tr>")
log.Error(err)
continue
}
Value := &txlogzRow{
Tid: strs[0],
Start: strs[1],
End: strs[2],
Duration: strs[3],
Decision: strs[4],
Statements: strings.Split(strs[5], ";"),
}
duration, _ := strconv.ParseFloat(Value.Duration, 64)
duration := txc.EndTime.Sub(txc.StartTime).Seconds()
var level string
if duration < 0.1 {
Value.Color = "low"
level = "low"
} else if duration < 1.0 {
Value.Color = "medium"
level = "medium"
} else {
Value.Color = "high"
level = "high"
}
txlogzTmpl.Execute(w, Value)
tmplData := struct {
*TxConnection
Duration float64
ColorLevel string
}{txc, duration, level}
txlogzTmpl.Execute(w, tmplData)
case <-deadline:
return
}