Merge pull request #1460 from youtube/vtgate

Fix gateway status aggregator.
This commit is contained in:
Liang 2016-01-29 12:56:57 -08:00
Родитель d5c31c592c 6d144bd7e7
Коммит 6adb187492
2 изменённых файлов: 89 добавлений и 15 удалений

Просмотреть файл

@ -1,4 +1,4 @@
// Copyright 2015, Google Inc. All rights reserved.
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
@ -126,7 +126,7 @@ type GatewayEndPointCacheStatus struct {
QueryCount uint64
QueryError uint64
QPS uint64
AvgLatency uint64 // in milliseconds
AvgLatency float64 // in milliseconds
}
// NewGatewayEndPointStatusAggregator creates a GatewayEndPointStatusAggregator.
@ -154,19 +154,22 @@ type GatewayEndPointStatusAggregator struct {
QueryCount uint64
QueryError uint64
// for QPS and latency (avg value over a minute)
tick uint32
queryCountInMinute [60]uint64
latencyInMinute [60]time.Duration
}
// UpdateQueryInfo updates the aggregator with the given information about a query.
func (gepsa *GatewayEndPointStatusAggregator) UpdateQueryInfo(tabletType topodatapb.TabletType, elapsed time.Duration, hasError bool) {
func (gepsa *GatewayEndPointStatusAggregator) UpdateQueryInfo(addr string, tabletType topodatapb.TabletType, elapsed time.Duration, hasError bool) {
gepsa.mu.Lock()
defer gepsa.mu.Unlock()
if addr != "" {
gepsa.Addr = addr
}
gepsa.TabletType = tabletType
idx := time.Now().Second() % 60
gepsa.QueryCount++
gepsa.queryCountInMinute[idx]++
gepsa.latencyInMinute[idx] += elapsed
gepsa.queryCountInMinute[gepsa.tick]++
gepsa.latencyInMinute[gepsa.tick] += elapsed
if hasError {
gepsa.QueryError++
}
@ -175,14 +178,14 @@ func (gepsa *GatewayEndPointStatusAggregator) UpdateQueryInfo(tabletType topodat
// GetCacheStatus returns a GatewayEndPointCacheStatus representing the current gateway status.
func (gepsa *GatewayEndPointStatusAggregator) GetCacheStatus() *GatewayEndPointCacheStatus {
status := &GatewayEndPointCacheStatus{
Keyspace: gepsa.Keyspace,
Shard: gepsa.Shard,
TabletType: gepsa.TabletType,
Name: gepsa.Name,
Addr: gepsa.Addr,
Keyspace: gepsa.Keyspace,
Shard: gepsa.Shard,
Name: gepsa.Name,
}
gepsa.mu.RLock()
defer gepsa.mu.RUnlock()
status.TabletType = gepsa.TabletType
status.Addr = gepsa.Addr
status.QueryCount = gepsa.QueryCount
status.QueryError = gepsa.QueryError
var totalQuery uint64
@ -195,7 +198,7 @@ func (gepsa *GatewayEndPointStatusAggregator) GetCacheStatus() *GatewayEndPointC
}
status.QPS = totalQuery / 60
if totalQuery > 0 {
status.AvgLatency = uint64(totalLatency.Nanoseconds()) / totalQuery / 100000
status.AvgLatency = float64(totalLatency.Nanoseconds()) / float64(totalQuery) / 1000000
}
return status
}
@ -204,7 +207,7 @@ func (gepsa *GatewayEndPointStatusAggregator) GetCacheStatus() *GatewayEndPointC
func (gepsa *GatewayEndPointStatusAggregator) resetNextSlot() {
gepsa.mu.Lock()
defer gepsa.mu.Unlock()
idx := (time.Now().Second() + 1) % 60
gepsa.queryCountInMinute[idx] = 0
gepsa.latencyInMinute[idx] = 0
gepsa.tick = (gepsa.tick + 1) % 60
gepsa.queryCountInMinute[gepsa.tick] = 0
gepsa.latencyInMinute[gepsa.tick] = time.Duration(0)
}

Просмотреть файл

@ -0,0 +1,71 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vtgate
import (
"reflect"
"testing"
"time"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
func TestGatwayEndPointStatusAggregator(t *testing.T) {
aggr := &GatewayEndPointStatusAggregator{
Keyspace: "k",
Shard: "s",
TabletType: topodatapb.TabletType_REPLICA,
Name: "n",
Addr: "a",
}
t.Logf("aggr = GatwayEndPointStatusAggregator{k, s, replica, n, a}")
aggr.UpdateQueryInfo("", topodatapb.TabletType_REPLICA, 10*time.Millisecond, false)
t.Logf("aggr.UpdateQueryInfo(, replica, 10ms, false)")
aggr.resetNextSlot()
t.Logf("aggr.resetNextSlot()")
aggr.UpdateQueryInfo("", topodatapb.TabletType_REPLICA, 8*time.Millisecond, false)
t.Logf("aggr.UpdateQueryInfo(, replica, 8ms, false)")
aggr.UpdateQueryInfo("", topodatapb.TabletType_REPLICA, 3*time.Millisecond, true)
t.Logf("aggr.UpdateQueryInfo(, replica, 3ms, true)")
want := &GatewayEndPointCacheStatus{
Keyspace: "k",
Shard: "s",
Name: "n",
TabletType: topodatapb.TabletType_REPLICA,
Addr: "a",
QueryCount: 3,
QueryError: 1,
QPS: 0,
AvgLatency: 7,
}
got := aggr.GetCacheStatus()
if !reflect.DeepEqual(got, want) {
t.Errorf("aggr.GetCacheStatus() = %+v, want %+v", got, want)
}
// reset values in idx=0
for i := 0; i < 59; i++ {
aggr.resetNextSlot()
}
t.Logf("59 aggr.resetNextSlot()")
aggr.UpdateQueryInfo("b", topodatapb.TabletType_MASTER, 9*time.Millisecond, false)
t.Logf("aggr.UpdateQueryInfo(b, master, 9ms, false)")
aggr.UpdateQueryInfo("", topodatapb.TabletType_MASTER, 6*time.Millisecond, true)
t.Logf("aggr.UpdateQueryInfo(, master, 4ms, true)")
want = &GatewayEndPointCacheStatus{
Keyspace: "k",
Shard: "s",
Name: "n",
TabletType: topodatapb.TabletType_MASTER,
Addr: "b",
QueryCount: 5,
QueryError: 2,
QPS: 0,
AvgLatency: 6.5,
}
got = aggr.GetCacheStatus()
if !reflect.DeepEqual(got, want) {
t.Errorf("aggr.GetCacheStatus() = %+v, want %+v", got, want)
}
}