Adding a status view of topo in vtgate.

This commit is contained in:
Alain Jobart 2014-05-14 13:08:22 -07:00
Родитель b1c09e4e31
Коммит 208a7e99a6
5 изменённых файлов: 387 добавлений и 25 удалений

84
go/cmd/vtgate/status.go Normal file
Просмотреть файл

@ -0,0 +1,84 @@
package main
import (
"github.com/youtube/vitess/go/vt/servenv"
_ "github.com/youtube/vitess/go/vt/status"
)
var (
topoTemplate = `
<style>
table {
border-collapse: collapse;
}
td, th {
border: 1px solid #999;
padding: 0.5rem;
}
</style>
<table>
<tr>
<th colspan="2">SrvKeyspace Names Cache</th>
</tr>
<tr>
<th>Cell</th>
<th>SrvKeyspace Names</th>
</tr>
{{range $i, $skn := .SrvKeyspaceNames}}
<tr>
<td>{{$skn.Cell}}</td>
<td>{{if $skn.LastError}}<b>{{$skn.LastError}}</b>{{else}}{{range $j, $value := $skn.Value}}{{$value}}&nbsp;{{end}}{{end}}</td>
</tr>
{{end}}
</table>
<br>
<table>
<tr>
<th colspan="3">SrvKeyspace Cache</th>
</tr>
<tr>
<th>Cell</th>
<th>Keyspace</th>
<th>SrvKeyspace</th>
</tr>
{{range $i, $skn := .SrvKeyspaces}}
<tr>
<td>{{$skn.Cell}}</td>
<td>{{$skn.Keyspace}}</td>
<td>{{if $skn.LastError}}<b>{{$skn.LastError}}</b>{{else}}{{$skn.StatusAsHTML}}{{end}}</td>
</tr>
{{end}}
</table>
<br>
<table>
<tr>
<th colspan="5">EndPoints Cache</th>
</tr>
<tr>
<th>Cell</th>
<th>Keyspace</th>
<th>Shard</th>
<th>TabletType</th>
<th>EndPoints</th>
</tr>
{{range $i, $skn := .EndPoints}}
<tr>
<td>{{$skn.Cell}}</td>
<td>{{$skn.Keyspace}}</td>
<td>{{$skn.Shard}}</td>
<td>{{$skn.TabletType}}</td>
<td>{{if $skn.LastError}}<b>{{$skn.LastError}}</b>{{else}}{{$skn.StatusAsHTML}}{{end}}</td>
</tr>
{{end}}
</table>
<small>This is just a cache, so some data may not be visible here yet.</small>
`
)
func init() {
servenv.OnRun(func() {
servenv.AddStatusPart("Topology Cache", topoTemplate, func() interface{} {
return resilientSrvTopoServer.CacheStatus()
})
})
}

Просмотреть файл

@ -22,6 +22,7 @@ var (
timeout = flag.Duration("timeout", 5*time.Second, "connection and call timeout")
)
var resilientSrvTopoServer *vtgate.ResilientSrvTopoServer
var topoReader *TopoReader
func main() {
@ -34,14 +35,14 @@ func main() {
ts := topo.GetServer()
defer topo.CloseServers()
rts := vtgate.NewResilientSrvTopoServer(ts, "ResilientSrvTopoServerCounts")
resilientSrvTopoServer = vtgate.NewResilientSrvTopoServer(ts, "ResilientSrvTopoServerCounts")
stats.Publish("EndpointCount", stats.CountersFunc(rts.HealthyEndpointCount))
stats.Publish("DegradedEndpointCount", stats.CountersFunc(rts.DegradedEndpointCount))
stats.Publish("EndpointCount", stats.CountersFunc(resilientSrvTopoServer.HealthyEndpointCount))
stats.Publish("DegradedEndpointCount", stats.CountersFunc(resilientSrvTopoServer.DegradedEndpointCount))
topoReader = NewTopoReader(rts)
topoReader = NewTopoReader(resilientSrvTopoServer)
topo.RegisterTopoReader(topoReader)
vtgate.Init(rts, *cell, *retryDelay, *retryCount, *timeout)
vtgate.Init(resilientSrvTopoServer, *cell, *retryDelay, *retryCount, *timeout)
servenv.Run()
}

Просмотреть файл

@ -6,6 +6,9 @@ package vtgate
import (
"flag"
"fmt"
"html/template"
"sort"
"sync"
"time"
@ -54,6 +57,9 @@ type ResilientSrvTopoServer struct {
}
type srvKeyspaceNamesEntry struct {
// unmutable values
cell string
// the mutex protects any access to this structure (read or write)
mutex sync.Mutex
@ -63,6 +69,10 @@ type srvKeyspaceNamesEntry struct {
}
type srvKeyspaceEntry struct {
// unmutable values
cell string
keyspace string
// the mutex protects any access to this structure (read or write)
mutex sync.Mutex
@ -72,16 +82,22 @@ type srvKeyspaceEntry struct {
}
type endPointsEntry struct {
// unmutable values
cell string
keyspace string
shard string
tabletType topo.TabletType
// the mutex protects any access to this structure (read or write)
mutex sync.Mutex
insertionTime time.Time
// Value is the end points that were returned to the client.
Value *topo.EndPoints
// OriginalValue is the end points that were returned from
// value is the end points that were returned to the client.
value *topo.EndPoints
// originalValue is the end points that were returned from
// the topology server.
OriginalValue *topo.EndPoints
originalValue *topo.EndPoints
lastError error
}
@ -135,7 +151,9 @@ func (server *ResilientSrvTopoServer) GetSrvKeyspaceNames(cell string) ([]string
server.mutex.Lock()
entry, ok := server.srvKeyspaceNamesCache[key]
if !ok {
entry = &srvKeyspaceNamesEntry{}
entry = &srvKeyspaceNamesEntry{
cell: cell,
}
server.srvKeyspaceNamesCache[key] = entry
}
server.mutex.Unlock()
@ -159,8 +177,8 @@ func (server *ResilientSrvTopoServer) GetSrvKeyspaceNames(cell string) ([]string
log.Errorf("GetSrvKeyspaceNames(%v) failed: %v (no cached value, caching and returning error)", cell, err)
} else {
server.counts.Add(cachedCategory, 1)
log.Warningf("GetSrvKeyspaceNames(%v) failed: %v (returning cached value)", cell, err)
return entry.value, nil
log.Warningf("GetSrvKeyspaceNames(%v) failed: %v (returning cached value: %v %v)", cell, err, entry.value, entry.lastError)
return entry.value, entry.lastError
}
}
@ -179,7 +197,10 @@ func (server *ResilientSrvTopoServer) GetSrvKeyspace(cell, keyspace string) (*to
server.mutex.Lock()
entry, ok := server.srvKeyspaceCache[key]
if !ok {
entry = &srvKeyspaceEntry{}
entry = &srvKeyspaceEntry{
cell: cell,
keyspace: keyspace,
}
server.srvKeyspaceCache[key] = entry
}
server.mutex.Unlock()
@ -203,8 +224,8 @@ func (server *ResilientSrvTopoServer) GetSrvKeyspace(cell, keyspace string) (*to
log.Errorf("GetSrvKeyspace(%v, %v) failed: %v (no cached value, caching and returning error)", cell, keyspace, err)
} else {
server.counts.Add(cachedCategory, 1)
log.Warningf("GetSrvKeyspace(%v, %v) failed: %v (returning cached value)", cell, keyspace, err)
return entry.value, nil
log.Warningf("GetSrvKeyspace(%v, %v) failed: %v (returning cached value: %v %v)", cell, keyspace, err, entry.value, entry.lastError)
return entry.value, entry.lastError
}
}
@ -223,7 +244,12 @@ func (server *ResilientSrvTopoServer) GetEndPoints(cell, keyspace, shard string,
server.mutex.Lock()
entry, ok := server.endPointsCache[key]
if !ok {
entry = &endPointsEntry{}
entry = &endPointsEntry{
cell: cell,
keyspace: keyspace,
shard: shard,
tabletType: tabletType,
}
server.endPointsCache[key] = entry
}
server.mutex.Unlock()
@ -236,7 +262,7 @@ func (server *ResilientSrvTopoServer) GetEndPoints(cell, keyspace, shard string,
// If the entry is fresh enough, return it
if time.Now().Sub(entry.insertionTime) < server.cacheTTL {
return entry.Value, entry.lastError
return entry.value, entry.lastError
}
// not in cache or too old, get the real value
@ -245,41 +271,257 @@ func (server *ResilientSrvTopoServer) GetEndPoints(cell, keyspace, shard string,
if entry.insertionTime.IsZero() {
server.counts.Add(errorCategory, 1)
log.Errorf("GetEndPoints(%v, %v, %v, %v) failed: %v (no cached value, caching and returning error)", cell, keyspace, shard, tabletType, err)
} else {
server.counts.Add(cachedCategory, 1)
log.Warningf("GetEndPoints(%v, %v, %v, %v) failed: %v (returning cached value: %v %v)", cell, keyspace, shard, tabletType, err, entry.value, entry.lastError)
return entry.value, entry.lastError
}
server.counts.Add(cachedCategory, 1)
log.Warningf("GetEndPoints(%v, %v, %v, %v) failed: %v (returning cached value)", cell, keyspace, shard, tabletType, err)
return entry.Value, nil
}
// save the value we got and the current time in the cache
entry.insertionTime = time.Now()
entry.OriginalValue = result
entry.Value = filterUnhealthyServers(result)
entry.originalValue = result
entry.value = filterUnhealthyServers(result)
entry.lastError = err
return entry.Value, err
return entry.value, err
}
// HealthyEndpointCount returns how many valid endpoints we have in the cache
func (server *ResilientSrvTopoServer) HealthyEndpointCount() map[string]int64 {
result := make(map[string]int64)
server.mutex.Lock()
defer server.mutex.Unlock()
for k, entry := range server.endPointsCache {
entry.mutex.Lock()
result[k] = int64(len(entry.Value.Entries))
result[k] = int64(len(entry.value.Entries))
entry.mutex.Unlock()
}
return result
}
// DegradedEndpointCount returns how many degraded endpoints we have
// in the cache (entries that are not 100% healthy, because they are behind
// on replication for instance)
func (server *ResilientSrvTopoServer) DegradedEndpointCount() map[string]int64 {
result := make(map[string]int64)
server.mutex.Lock()
defer server.mutex.Unlock()
for k, entry := range server.endPointsCache {
entry.mutex.Lock()
result[k] = int64(len(entry.OriginalValue.Entries) - len(entry.Value.Entries))
// originalValue and value can be nil in case of error
ovl := 0
if entry.originalValue != nil {
ovl = len(entry.originalValue.Entries)
}
vl := 0
if entry.value != nil {
vl = len(entry.value.Entries)
}
entry.mutex.Unlock()
result[k] = int64(ovl - vl)
}
return result
}
// The next few structures and methods are used to get a displayable
// version of the cache in a status page
// SrvKeyspaceNamesCacheStatus is the current value for SrvKeyspaceNames
type SrvKeyspaceNamesCacheStatus struct {
Cell string
Value []string
LastError error
}
// SrvKeyspaceNamesCacheStatusList is used for sorting
type SrvKeyspaceNamesCacheStatusList []*SrvKeyspaceNamesCacheStatus
// Len is part of sort.Interface
func (skncsl SrvKeyspaceNamesCacheStatusList) Len() int {
return len(skncsl)
}
// Less is part of sort.Interface
func (skncsl SrvKeyspaceNamesCacheStatusList) Less(i, j int) bool {
return skncsl[i].Cell < skncsl[j].Cell
}
// Swap is part of sort.Interface
func (skncsl SrvKeyspaceNamesCacheStatusList) Swap(i, j int) {
skncsl[i], skncsl[j] = skncsl[j], skncsl[i]
}
// SrvKeyspaceCacheStatus is the current value for a SrvKeyspace object
type SrvKeyspaceCacheStatus struct {
Cell string
Keyspace string
Value *topo.SrvKeyspace
LastError error
}
// StatusAsHTML returns an HTML version of our status.
// It works best if there is data in the cache.
func (st *SrvKeyspaceCacheStatus) StatusAsHTML() template.HTML {
if st.Value == nil {
return template.HTML("No Data")
}
result := "<b>Partitions:</b><br>"
for tabletType, keyspacePartition := range st.Value.Partitions {
result += "&nbsp;<b>" + string(tabletType) + "</b>"
for _, shard := range keyspacePartition.Shards {
result += "&nbsp;" + shard.ShardName()
}
result += "<br>"
}
result += "<b>TabletTypes:</b>"
for _, tabletType := range st.Value.TabletTypes {
result += "&nbsp;" + string(tabletType)
}
result += "<br>"
if st.Value.ShardingColumnName != "" {
result += "<b>ShardingColumnName:</b>&nbsp;" + st.Value.ShardingColumnName + "<br>"
result += "<b>ShardingColumnType:</b>&nbsp;" + string(st.Value.ShardingColumnType) + "<br>"
}
if len(st.Value.ServedFrom) > 0 {
result += "<b>ServedFrom:</b><br>"
for tabletType, keyspace := range st.Value.ServedFrom {
result += "&nbsp;<b>" + string(tabletType) + "</b>&nbsp;" + keyspace + "<br>"
}
}
return template.HTML(result)
}
// SrvKeyspaceCacheStatusList is used for sorting
type SrvKeyspaceCacheStatusList []*SrvKeyspaceCacheStatus
// Len is part of sort.Interface
func (skcsl SrvKeyspaceCacheStatusList) Len() int {
return len(skcsl)
}
// Less is part of sort.Interface
func (skcsl SrvKeyspaceCacheStatusList) Less(i, j int) bool {
return skcsl[i].Cell+":"+skcsl[i].Keyspace <
skcsl[j].Cell+":"+skcsl[j].Keyspace
}
// Swap is part of sort.Interface
func (skcsl SrvKeyspaceCacheStatusList) Swap(i, j int) {
skcsl[i], skcsl[j] = skcsl[j], skcsl[i]
}
// EndPointsCacheStatus is the current value for an EndPoints object
type EndPointsCacheStatus struct {
Cell string
Keyspace string
Shard string
TabletType topo.TabletType
Value *topo.EndPoints
OriginalValue *topo.EndPoints
LastError error
}
// StatusAsHTML returns an HTML version of our status.
// It works best if there is data in the cache.
func (st *EndPointsCacheStatus) StatusAsHTML() template.HTML {
ovl := 0
if st.OriginalValue != nil {
ovl = len(st.OriginalValue.Entries)
}
vl := 0
if st.Value != nil {
vl = len(st.Value.Entries)
}
if ovl == vl {
if vl == 0 {
return template.HTML("<b>No entries</b>")
}
if len(st.OriginalValue.Entries[0].Health) > 0 {
return template.HTML(fmt.Sprintf("<b>All %v values are unhappy</b>", vl))
}
return template.HTML(fmt.Sprintf("%v values are happy", vl))
}
return template.HTML(fmt.Sprintf("%v out of %v values are happy", vl, ovl))
}
// EndPointsCacheStatusList is used for sorting
type EndPointsCacheStatusList []*EndPointsCacheStatus
// Len is part of sort.Interface
func (epcsl EndPointsCacheStatusList) Len() int {
return len(epcsl)
}
// Less is part of sort.Interface
func (epcsl EndPointsCacheStatusList) Less(i, j int) bool {
return epcsl[i].Cell+":"+epcsl[i].Keyspace+":"+epcsl[i].Shard+":"+string(epcsl[i].TabletType) <
epcsl[j].Cell+":"+epcsl[j].Keyspace+":"+epcsl[j].Shard+":"+string(epcsl[j].TabletType)
}
// Swap is part of sort.Interface
func (epcsl EndPointsCacheStatusList) Swap(i, j int) {
epcsl[i], epcsl[j] = epcsl[j], epcsl[i]
}
// ResilientSrvTopoServerCacheStatus has the full status of the cache
type ResilientSrvTopoServerCacheStatus struct {
SrvKeyspaceNames SrvKeyspaceNamesCacheStatusList
SrvKeyspaces SrvKeyspaceCacheStatusList
EndPoints EndPointsCacheStatusList
}
// CacheStatus returns a displayable version of the cache
func (server *ResilientSrvTopoServer) CacheStatus() *ResilientSrvTopoServerCacheStatus {
result := &ResilientSrvTopoServerCacheStatus{}
server.mutex.Lock()
for _, entry := range server.srvKeyspaceNamesCache {
entry.mutex.Lock()
result.SrvKeyspaceNames = append(result.SrvKeyspaceNames, &SrvKeyspaceNamesCacheStatus{
Cell: entry.cell,
Value: entry.value,
LastError: entry.lastError,
})
entry.mutex.Unlock()
}
for _, entry := range server.srvKeyspaceCache {
entry.mutex.Lock()
result.SrvKeyspaces = append(result.SrvKeyspaces, &SrvKeyspaceCacheStatus{
Cell: entry.cell,
Keyspace: entry.keyspace,
Value: entry.value,
LastError: entry.lastError,
})
entry.mutex.Unlock()
}
for _, entry := range server.endPointsCache {
entry.mutex.Lock()
result.EndPoints = append(result.EndPoints, &EndPointsCacheStatus{
Cell: entry.cell,
Keyspace: entry.keyspace,
Shard: entry.shard,
TabletType: entry.tabletType,
Value: entry.value,
OriginalValue: entry.originalValue,
LastError: entry.lastError,
})
entry.mutex.Unlock()
}
server.mutex.Unlock()
// do the sorting without the mutex
sort.Sort(result.SrvKeyspaceNames)
sort.Sort(result.SrvKeyspaces)
sort.Sort(result.EndPoints)
return result
}

Просмотреть файл

@ -258,4 +258,14 @@ func TestCachedErrors(t *testing.T) {
if ft.callCount != 1 {
t.Fatalf("GetSrvKeyspace was called again: %u times", ft.callCount)
}
// ask again after expired cache, should get an error
rsts.cacheTTL = 0
_, err = rsts.GetSrvKeyspace("", "unknown_ks")
if err == nil {
t.Fatalf("Third GetSrvKeyspace didn't return an error")
}
if ft.callCount != 2 {
t.Fatalf("GetSrvKeyspace was not called again: %u times", ft.callCount)
}
}

Просмотреть файл

@ -8,6 +8,7 @@ import urllib2
import environment
import tablet
import utils
from zk import zkocc
# range "" - 80
@ -23,6 +24,9 @@ scrap = tablet.Tablet()
# all tablets
tablets = [shard_0_master, shard_0_replica, shard_1_master, shard_1_replica,
idle, scrap, shard_0_spare]
# vtgate
vtgate_server = None
vtgate_port = None
class VtctldError(Exception): pass
@ -83,6 +87,8 @@ def tearDownModule():
if utils.options.skip_teardown:
return
utils.vtgate_kill(vtgate_server)
teardown_procs = [t.teardown_mysql() for t in tablets]
utils.wait_procs(teardown_procs, raise_on_error=False)
@ -136,6 +142,10 @@ class TestVtctld(unittest.TestCase):
# run checks now before we start the tablets
utils.validate_topology()
# start a vtgate server too
global vtgate_server, vtgate_port
vtgate_server, vtgate_port = utils.vtgate_start(cache_ttl='0s')
def setUp(self):
self.data = vtctld.dbtopo()
self.serving_data = vtctld.serving_graph()
@ -189,5 +199,20 @@ class TestVtctld(unittest.TestCase):
self.assertIn('Alias: <a href="http://localhost:', shard_0_replica_status)
self.assertIn('</html>', shard_0_replica_status)
def test_vtgate(self):
# do a few vtgate topology queries to prime the cache
vtgate_client = zkocc.ZkOccConnection("localhost:%u" % vtgate_port,
"test_nj", 30.0)
vtgate_client.dial()
vtgate_client.get_srv_keyspace_names("test_nj")
vtgate_client.get_srv_keyspace("test_nj", "test_keyspace")
vtgate_client.get_end_points("test_nj", "test_keyspace", "-80", "master")
vtgate_client.close()
vtgate_status = utils.get_status(vtgate_port)
self.assertIn('</html>', vtgate_status)
utils.pause("You can now run a browser and connect to http://localhost:%u%s to manually check vtgate status page" % (vtgate_port, environment.status_url))
if __name__ == '__main__':
utils.main()