Merge pull request #1699 from guoliang100/cleanup

Clean up GetEndPoint in VTGate.
This commit is contained in:
Liang 2016-05-13 11:06:00 -07:00
Родитель 4553f359b6 a9f0f1e6f5
Коммит 8c9ec45e42
5 изменённых файлов: 11 добавлений и 353 удалений

Просмотреть файл

@ -50,29 +50,6 @@ var (
</tr>
{{end}}
</table>
<br>
<table>
<tr>
<th colspan="5">EndPoints Cache (NOT USED for new endpoint implementation)</th>
</tr>
<tr>
<th>Cell</th>
<th>Keyspace</th>
<th>Shard</th>
<th>TabletType</th>
<th>EndPoints</th>
</tr>
{{range $i, $ep := .EndPoints}}
<tr>
<td>{{github_com_youtube_vitess_vtctld_srv_cell $ep.Cell}}</td>
<td>{{github_com_youtube_vitess_vtctld_srv_keyspace $ep.Cell $ep.Keyspace}}</td>
<td>{{github_com_youtube_vitess_vtctld_srv_shard $ep.Cell $ep.Keyspace $ep.Shard}}</td>
<td>{{github_com_youtube_vitess_vtctld_srv_type $ep.Cell $ep.Keyspace $ep.Shard $ep.TabletType}}</td>
<td>{{if $ep.LastError}}<b>{{$ep.LastError}}</b>{{else}}{{$ep.StatusAsHTML}}{{end}}</td>
</tr>
{{end}}
</table>
<small>This is just a cache, so some data may not be visible here yet. It is empty if using new endpoint implementation.</small>
`
statsTemplate = `

Просмотреть файл

@ -351,7 +351,6 @@ type SrvTopoServer interface {
GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error)
GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error)
GetSrvShard(ctx context.Context, cell, keyspace, shard string) (*topodatapb.SrvShard, error)
GetEndPoints(ctx context.Context, cell, keyspace, shard string, tabletType topodatapb.TabletType) (*topodatapb.EndPoints, int64, error)
WatchVSchema(ctx context.Context, keyspace string) (notifications <-chan string, err error)
}

Просмотреть файл

@ -23,17 +23,14 @@ import (
)
var (
srvTopoCacheTTL = flag.Duration("srv_topo_cache_ttl", 1*time.Second, "how long to use cached entries for topology")
enableRemoteMaster = flag.Bool("enable_remote_master", false, "enable remote master access")
srvTopoTimeout = flag.Duration("srv_topo_timeout", 2*time.Second, "topo server timeout")
srvTopoCacheTTL = flag.Duration("srv_topo_cache_ttl", 1*time.Second, "how long to use cached entries for topology")
srvTopoTimeout = flag.Duration("srv_topo_timeout", 2*time.Second, "topo server timeout")
)
const (
queryCategory = "query"
cachedCategory = "cached"
errorCategory = "error"
remoteQueryCategory = "remote-query"
remoteErrorCategory = "remote-error"
queryCategory = "query"
cachedCategory = "cached"
errorCategory = "error"
)
// ResilientSrvTopoServer is an implementation of SrvTopoServer based
@ -41,10 +38,9 @@ const (
// - limit the QPS to the underlying topo.Server
// - return the last known value of the data if there is an error
type ResilientSrvTopoServer struct {
topoServer topo.Server
cacheTTL time.Duration
enableRemoteMaster bool
counts *stats.Counters
topoServer topo.Server
cacheTTL time.Duration
counts *stats.Counters
// mutex protects the cache map itself, not the individual
// values in the cache.
@ -52,39 +48,6 @@ type ResilientSrvTopoServer struct {
srvKeyspaceNamesCache map[string]*srvKeyspaceNamesEntry
srvKeyspaceCache map[string]*srvKeyspaceEntry
srvShardCache map[string]*srvShardEntry
endPointsCache map[string]*endPointsEntry
// GetEndPoints stats.
endPointCounters *endPointCounters
}
type endPointCounters struct {
queries *stats.MultiCounters
errors *stats.MultiCounters
emptyResults *stats.MultiCounters
remoteQueries *stats.MultiCounters
numberReturned *stats.MultiCounters
cacheHits *stats.MultiCounters
remoteLookups *stats.MultiCounters
remoteLookupErrors *stats.MultiCounters
lookupErrors *stats.MultiCounters
staleCacheFallbacks *stats.MultiCounters
}
func newEndPointCounters(counterPrefix string) *endPointCounters {
labels := []string{"Cell", "Keyspace", "ShardName", "DbType"}
return &endPointCounters{
queries: stats.NewMultiCounters(counterPrefix+"EndPointQueryCount", labels),
errors: stats.NewMultiCounters(counterPrefix+"EndPointErrorCount", labels),
emptyResults: stats.NewMultiCounters(counterPrefix+"EndPointEmptyResultCount", labels),
numberReturned: stats.NewMultiCounters(counterPrefix+"EndPointsReturnedCount", labels),
cacheHits: stats.NewMultiCounters(counterPrefix+"EndPointCacheHitCount", labels),
remoteQueries: stats.NewMultiCounters(counterPrefix+"EndPointRemoteQueryCount", labels),
remoteLookups: stats.NewMultiCounters(counterPrefix+"EndPointRemoteLookupCount", labels),
remoteLookupErrors: stats.NewMultiCounters(counterPrefix+"EndPointRemoteLookupErrorCount", labels),
lookupErrors: stats.NewMultiCounters(counterPrefix+"EndPointLookupErrorCount", labels),
staleCacheFallbacks: stats.NewMultiCounters(counterPrefix+"EndPointStaleCacheFallbackCount", labels),
}
}
type srvKeyspaceNamesEntry struct {
@ -158,41 +121,17 @@ type srvShardEntry struct {
lastErrorCtx context.Context
}
type endPointsEntry struct {
// unmutable values
cell string
keyspace string
shard string
tabletType topodatapb.TabletType
remote bool
// the mutex protects any access to this structure (read or write)
mutex sync.Mutex
insertionTime time.Time
// value is the end points that were returned to the client.
value *topodatapb.EndPoints
lastError error
lastErrorCtx context.Context
}
// NewResilientSrvTopoServer creates a new ResilientSrvTopoServer
// based on the provided topo.Server.
func NewResilientSrvTopoServer(base topo.Server, counterPrefix string) *ResilientSrvTopoServer {
return &ResilientSrvTopoServer{
topoServer: base,
cacheTTL: *srvTopoCacheTTL,
enableRemoteMaster: *enableRemoteMaster,
counts: stats.NewCounters(counterPrefix + "Counts"),
topoServer: base,
cacheTTL: *srvTopoCacheTTL,
counts: stats.NewCounters(counterPrefix + "Counts"),
srvKeyspaceNamesCache: make(map[string]*srvKeyspaceNamesEntry),
srvKeyspaceCache: make(map[string]*srvKeyspaceEntry),
srvShardCache: make(map[string]*srvShardEntry),
endPointsCache: make(map[string]*endPointsEntry),
endPointCounters: newEndPointCounters(counterPrefix),
}
}
@ -401,106 +340,6 @@ func (server *ResilientSrvTopoServer) GetSrvShard(ctx context.Context, cell, key
return result, err
}
// GetEndPoints return all endpoints for the given cell, keyspace, shard, and tablet type.
func (server *ResilientSrvTopoServer) GetEndPoints(ctx context.Context, cell, keyspace, shard string, tabletType topodatapb.TabletType) (result *topodatapb.EndPoints, version int64, err error) {
shard = strings.ToLower(shard)
key := []string{cell, keyspace, shard, strings.ToLower(tabletType.String())}
server.counts.Add(queryCategory, 1)
server.endPointCounters.queries.Add(key, 1)
// find the entry in the cache, add it if not there
keyStr := strings.Join(key, ".")
server.mutex.Lock()
entry, ok := server.endPointsCache[keyStr]
if !ok {
entry = &endPointsEntry{
cell: cell,
keyspace: keyspace,
shard: shard,
tabletType: tabletType,
}
server.endPointsCache[keyStr] = entry
}
server.mutex.Unlock()
// Lock the entry, and do everything holding the lock. This
// means two concurrent requests will only issue one
// underlying query.
entry.mutex.Lock()
defer entry.mutex.Unlock()
// Whether the query was serviced with remote endpoints.
remote := false
// Record some stats regardless of cache status.
defer func() {
if remote {
server.endPointCounters.remoteQueries.Add(key, 1)
}
if err != nil {
server.endPointCounters.errors.Add(key, 1)
return
}
if result == nil || len(result.Entries) == 0 {
server.endPointCounters.emptyResults.Add(key, 1)
return
}
server.endPointCounters.numberReturned.Add(key, int64(len(result.Entries)))
}()
// If the entry is fresh enough, return it
if time.Now().Sub(entry.insertionTime) < server.cacheTTL {
server.endPointCounters.cacheHits.Add(key, 1)
remote = entry.remote
return entry.value, -1, entry.lastError
}
// not in cache or too old, get the real value
newCtx, cancel := context.WithTimeout(context.Background(), *srvTopoTimeout)
defer cancel()
result, _, err = server.topoServer.GetEndPoints(newCtx, cell, keyspace, shard, tabletType)
// get remote endpoints for master if enabled
if err != nil && server.enableRemoteMaster && tabletType == topodatapb.TabletType_MASTER {
remote = true
server.counts.Add(remoteQueryCategory, 1)
server.endPointCounters.remoteLookups.Add(key, 1)
var ss *topodatapb.SrvShard
ss, err = server.topoServer.GetSrvShard(newCtx, cell, keyspace, shard)
if err != nil {
server.counts.Add(remoteErrorCategory, 1)
server.endPointCounters.remoteLookupErrors.Add(key, 1)
log.Errorf("GetEndPoints(%v, %v, %v, %v, %v) failed to get SrvShard for remote master: %v",
newCtx, cell, keyspace, shard, tabletType, err)
} else {
if ss.MasterCell != "" && ss.MasterCell != cell {
result, _, err = server.topoServer.GetEndPoints(newCtx, ss.MasterCell, keyspace, shard, tabletType)
}
}
}
if err != nil {
server.endPointCounters.lookupErrors.Add(key, 1)
if entry.insertionTime.IsZero() {
server.counts.Add(errorCategory, 1)
log.Errorf("GetEndPoints(%v, %v, %v, %v, %v) failed: %v (no cached value, caching and returning error)", newCtx, cell, keyspace, shard, tabletType, err)
} else {
server.counts.Add(cachedCategory, 1)
server.endPointCounters.staleCacheFallbacks.Add(key, 1)
log.Warningf("GetEndPoints(%v, %v, %v, %v, %v) failed: %v (returning cached value: %v %v)", newCtx, cell, keyspace, shard, tabletType, err, entry.value, entry.lastError)
return entry.value, -1, entry.lastError
}
}
// save the value we got and the current time in the cache
entry.insertionTime = time.Now()
entry.value = result
entry.lastError = err
entry.lastErrorCtx = newCtx
entry.remote = remote
return entry.value, -1, err
}
// The next few structures and methods are used to get a displayable
// version of the cache in a status page
@ -632,67 +471,11 @@ func (sscsl SrvShardCacheStatusList) Swap(i, j int) {
sscsl[i], sscsl[j] = sscsl[j], sscsl[i]
}
// EndPointsCacheStatus is the current value for an EndPoints object
type EndPointsCacheStatus struct {
Cell string
Keyspace string
Shard string
TabletType topodatapb.TabletType
Value *topodatapb.EndPoints
LastError error
LastErrorCtx context.Context
}
// StatusAsHTML returns an HTML version of our status.
// It works best if there is data in the cache.
func (st *EndPointsCacheStatus) StatusAsHTML() template.HTML {
if st.Value == nil || len(st.Value.Entries) == 0 {
return template.HTML("<b>No endpoints</b>")
}
// Assemble links to individual endpoints
epLinks := "{ "
for _, ve := range st.Value.Entries {
healthColor := "red"
var vtPort int32
var ok bool
if vtPort, ok = ve.PortMap["vt"]; ok {
// EndPoint is healthy
healthColor = "green"
}
epLinks += fmt.Sprintf(
"<a href=\"http://%v:%d\" style=\"color:%v\">%v:%d</a> ",
ve.Host, vtPort, healthColor, ve.Host, vtPort)
}
epLinks += "}"
return template.HTML(fmt.Sprintf("Serving from %v healthy endpoints: %v", len(st.Value.Entries), epLinks))
}
// EndPointsCacheStatusList is used for sorting
type EndPointsCacheStatusList []*EndPointsCacheStatus
// Len is part of sort.Interface
func (epcsl EndPointsCacheStatusList) Len() int {
return len(epcsl)
}
// Less is part of sort.Interface
func (epcsl EndPointsCacheStatusList) Less(i, j int) bool {
return epcsl[i].Cell+"."+epcsl[i].Keyspace+"."+epcsl[i].Shard+"."+string(epcsl[i].TabletType) <
epcsl[j].Cell+"."+epcsl[j].Keyspace+"."+epcsl[j].Shard+"."+string(epcsl[j].TabletType)
}
// Swap is part of sort.Interface
func (epcsl EndPointsCacheStatusList) Swap(i, j int) {
epcsl[i], epcsl[j] = epcsl[j], epcsl[i]
}
// ResilientSrvTopoServerCacheStatus has the full status of the cache
type ResilientSrvTopoServerCacheStatus struct {
SrvKeyspaceNames SrvKeyspaceNamesCacheStatusList
SrvKeyspaces SrvKeyspaceCacheStatusList
SrvShards SrvShardCacheStatusList
EndPoints EndPointsCacheStatusList
}
// CacheStatus returns a displayable version of the cache
@ -736,27 +519,12 @@ func (server *ResilientSrvTopoServer) CacheStatus() *ResilientSrvTopoServerCache
entry.mutex.Unlock()
}
for _, entry := range server.endPointsCache {
entry.mutex.Lock()
result.EndPoints = append(result.EndPoints, &EndPointsCacheStatus{
Cell: entry.cell,
Keyspace: entry.keyspace,
Shard: entry.shard,
TabletType: entry.tabletType,
Value: entry.value,
LastError: entry.lastError,
LastErrorCtx: entry.lastErrorCtx,
})
entry.mutex.Unlock()
}
server.mutex.Unlock()
// do the sorting without the mutex
sort.Sort(result.SrvKeyspaceNames)
sort.Sort(result.SrvKeyspaces)
sort.Sort(result.SrvShards)
sort.Sort(result.EndPoints)
return result
}

Просмотреть файл

@ -59,87 +59,6 @@ func (ft *fakeTopo) GetSrvShard(ctx context.Context, cell, keyspace, shard strin
}, nil
}
func (ft *fakeTopo) GetEndPoints(ctx context.Context, cell, keyspace, shard string, tabletType topodatapb.TabletType) (*topodatapb.EndPoints, int64, error) {
return nil, -1, fmt.Errorf("No endpoints")
}
type fakeTopoRemoteMaster struct {
fakeTopo
cell string
remoteCell string
}
func (ft *fakeTopoRemoteMaster) GetSrvShard(ctx context.Context, cell, keyspace, shard string) (*topodatapb.SrvShard, error) {
return &topodatapb.SrvShard{
Name: shard,
MasterCell: ft.remoteCell,
}, nil
}
func (ft *fakeTopoRemoteMaster) GetEndPoints(ctx context.Context, cell, keyspace, shard string, tabletType topodatapb.TabletType) (*topodatapb.EndPoints, int64, error) {
if cell != ft.cell && cell != ft.remoteCell {
return nil, -1, fmt.Errorf("GetEndPoints: invalid cell: %v", cell)
}
if cell == ft.cell || tabletType != topodatapb.TabletType_MASTER {
return &topodatapb.EndPoints{
Entries: []*topodatapb.EndPoint{
{
Uid: 0,
},
},
}, -1, nil
}
return &topodatapb.EndPoints{
Entries: []*topodatapb.EndPoint{
{
Uid: 1,
},
},
}, -1, nil
}
// TestRemoteMaster will test getting endpoints for remote master.
func TestRemoteMaster(t *testing.T) {
ft := &fakeTopoRemoteMaster{cell: "cell1", remoteCell: "cell2"}
rsts := NewResilientSrvTopoServer(topo.Server{Impl: ft}, "TestRemoteMaster")
rsts.enableRemoteMaster = true
// remote cell for master
ep, _, err := rsts.GetEndPoints(context.Background(), "cell3", "test_ks", "1", topodatapb.TabletType_MASTER)
if err != nil {
t.Fatalf("GetEndPoints got unexpected error: %v", err)
}
if ep.Entries[0].Uid != 1 {
t.Fatalf("GetEndPoints got %v want 1", ep.Entries[0].Uid)
}
remoteQueryCount := rsts.counts.Counts()[remoteQueryCategory]
if remoteQueryCount != 1 {
t.Fatalf("Get remoteQueryCategory count got %v want 1", remoteQueryCount)
}
// no remote cell for non-master
ep, _, err = rsts.GetEndPoints(context.Background(), "cell3", "test_ks", "0", topodatapb.TabletType_REPLICA)
if err == nil {
t.Fatalf("GetEndPoints did not return an error")
}
// no remote cell for master
rsts.enableRemoteMaster = false
ep, _, err = rsts.GetEndPoints(context.Background(), "cell3", "test_ks", "2", topodatapb.TabletType_MASTER)
if err == nil {
t.Fatalf("GetEndPoints did not return an error")
}
// use cached value from above
ep, _, err = rsts.GetEndPoints(context.Background(), "cell3", "test_ks", "1", topodatapb.TabletType_MASTER)
if err != nil {
t.Fatalf("GetEndPoints got unexpected error: %v", err)
}
ep, _, err = rsts.GetEndPoints(context.Background(), "cell1", "test_ks", "1", topodatapb.TabletType_MASTER)
if ep.Entries[0].Uid != 0 {
t.Fatalf("GetEndPoints got %v want 0", ep.Entries[0].Uid)
}
}
// TestGetSrvKeyspace will test we properly return updated SrvKeyspace.
func TestGetSrvKeyspace(t *testing.T) {
ft := &fakeTopo{keyspace: "test_ks"}

Просмотреть файл

@ -254,11 +254,6 @@ func (sct *sandboxTopo) GetSrvShard(ctx context.Context, cell, keyspace, shard s
return nil, fmt.Errorf("Unsupported")
}
// GetEndPoints is part of SrvTopoServer.
func (sct *sandboxTopo) GetEndPoints(ctx context.Context, cell, keyspace, shard string, tabletType topodatapb.TabletType) (*topodatapb.EndPoints, int64, error) {
return nil, -1, fmt.Errorf("Unsupported")
}
func sandboxDialer(ctx context.Context, endPoint *topodatapb.EndPoint, keyspace, shard string, tabletType topodatapb.TabletType, timeout time.Duration) (tabletconn.TabletConn, error) {
sand := getSandbox(keyspace)
sand.sandmu.Lock()