Merge pull request #1433 from youtube/vtgate

Watch SrvKeyspace instead of polling in VTGate.
This commit is contained in:
Liang 2016-01-14 13:49:49 -08:00
Родитель af9033d86f 100ec29ac7
Коммит 8493d976f1
2 изменённых файлов: 194 добавлений и 57 удалений

Просмотреть файл

@ -48,7 +48,7 @@ type ResilientSrvTopoServer struct {
// mutex protects the cache map itself, not the individual
// values in the cache.
mutex sync.Mutex
mutex sync.RWMutex
srvKeyspaceNamesCache map[string]*srvKeyspaceNamesEntry
srvKeyspaceCache map[string]*srvKeyspaceEntry
srvShardCache map[string]*srvShardEntry
@ -108,12 +108,11 @@ type srvKeyspaceEntry struct {
keyspace string
// the mutex protects any access to this structure (read or write)
mutex sync.Mutex
mutex sync.RWMutex
insertionTime time.Time
value *topodatapb.SrvKeyspace
lastError error
lastErrorCtx context.Context
value *topodatapb.SrvKeyspace
lastError error
lastErrorCtx context.Context
}
type srvShardEntry struct {
@ -257,14 +256,19 @@ func (server *ResilientSrvTopoServer) GetSrvKeyspaceNames(ctx context.Context, c
return result, err
}
// GetSrvKeyspace returns SrvKeyspace object for the given cell and keyspace.
func (server *ResilientSrvTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) {
server.counts.Add(queryCategory, 1)
func (server *ResilientSrvTopoServer) getSrvKeyspaceEntry(cell, keyspace string) *srvKeyspaceEntry {
// find the entry in the cache, add it if not there
key := cell + "." + keyspace
server.mutex.Lock()
server.mutex.RLock()
entry, ok := server.srvKeyspaceCache[key]
if ok {
server.mutex.RUnlock()
return entry
}
server.mutex.RUnlock()
server.mutex.Lock()
entry, ok = server.srvKeyspaceCache[key]
if !ok {
entry = &srvKeyspaceEntry{
cell: cell,
@ -273,6 +277,21 @@ func (server *ResilientSrvTopoServer) GetSrvKeyspace(ctx context.Context, cell,
server.srvKeyspaceCache[key] = entry
}
server.mutex.Unlock()
return entry
}
// GetSrvKeyspace returns SrvKeyspace object for the given cell and keyspace.
func (server *ResilientSrvTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) {
entry := server.getSrvKeyspaceEntry(cell, keyspace)
// If the entry exists, return it
entry.mutex.RLock()
if entry.value != nil {
v := entry.value
entry.mutex.RUnlock()
return v, nil
}
entry.mutex.RUnlock()
// Lock the entry, and do everything holding the lock. This
// means two concurrent requests will only issue one
@ -280,33 +299,47 @@ func (server *ResilientSrvTopoServer) GetSrvKeyspace(ctx context.Context, cell,
entry.mutex.Lock()
defer entry.mutex.Unlock()
// If the entry is fresh enough, return it
if time.Now().Sub(entry.insertionTime) < server.cacheTTL {
return entry.value, entry.lastError
// If the entry exists, return it
if entry.value != nil {
return entry.value, nil
}
// not in cache or too old, get the real value
newCtx, cancel := context.WithTimeout(context.Background(), *srvTopoTimeout)
defer cancel()
// not in cache, get the real value
newCtx := context.Background()
result, err := server.topoServer.GetSrvKeyspace(newCtx, cell, keyspace)
// start watching
notifications, stopWatching, err := server.topoServer.WatchSrvKeyspace(newCtx, cell, keyspace)
if err != nil {
if entry.insertionTime.IsZero() {
server.counts.Add(errorCategory, 1)
log.Errorf("GetSrvKeyspace(%v, %v, %v) failed: %v (no cached value, caching and returning error)", newCtx, cell, keyspace, err)
} else {
server.counts.Add(cachedCategory, 1)
log.Warningf("GetSrvKeyspace(%v, %v, %v) failed: %v (returning cached value: %v %v)", newCtx, cell, keyspace, err, entry.value, entry.lastError)
return entry.value, entry.lastError
}
entry.lastError = err
entry.lastErrorCtx = ctx
log.Errorf("WatchSrvKeyspace failed for %v/%v: %v", cell, keyspace, err)
return nil, entry.lastError
}
sk, ok := <-notifications
if !ok {
entry.lastError = fmt.Errorf("failed to receive from channel")
entry.lastErrorCtx = ctx
log.Errorf("WatchSrvKeyspace first result failed for %v/%v", cell, keyspace)
close(stopWatching)
return nil, entry.lastError
}
// save the value we got and the current time in the cache
entry.insertionTime = time.Now()
entry.value = result
entry.lastError = err
entry.lastErrorCtx = newCtx
return result, err
// cache the first notification
entry.value = sk
entry.lastError = nil
entry.lastErrorCtx = nil
go func() {
for sk := range notifications {
entry.mutex.Lock()
entry.value = sk
entry.mutex.Unlock()
}
log.Errorf("failed to receive from channel")
close(stopWatching)
}()
return entry.value, nil
}
// GetSrvShard returns SrvShard object for the given cell, keyspace, and shard.
@ -705,7 +738,7 @@ func (server *ResilientSrvTopoServer) CacheStatus() *ResilientSrvTopoServerCache
}
for _, entry := range server.srvKeyspaceCache {
entry.mutex.Lock()
entry.mutex.RLock()
result.SrvKeyspaces = append(result.SrvKeyspaces, &SrvKeyspaceCacheStatus{
Cell: entry.cell,
Keyspace: entry.keyspace,
@ -713,7 +746,7 @@ func (server *ResilientSrvTopoServer) CacheStatus() *ResilientSrvTopoServerCache
LastError: entry.lastError,
LastErrorCtx: entry.lastErrorCtx,
})
entry.mutex.Unlock()
entry.mutex.RUnlock()
}
for _, entry := range server.srvShardCache {

Просмотреть файл

@ -8,7 +8,9 @@ import (
"fmt"
"reflect"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/test/faketopo"
"golang.org/x/net/context"
@ -193,20 +195,43 @@ func TestFilterUnhealthy(t *testing.T) {
// returns errors for everything, except the one keyspace.
type fakeTopo struct {
faketopo.FakeTopo
keyspace string
callCount int
keyspace string
callCount int
notifications chan *topodatapb.SrvKeyspace
stopWatching chan struct{}
}
func (ft *fakeTopo) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) {
return []string{ft.keyspace}, nil
}
func (ft *fakeTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) {
func (ft *fakeTopo) UpdateSrvKeyspace(ctx context.Context, cell, keyspace string, srvKeyspace *topodatapb.SrvKeyspace) error {
if keyspace != ft.keyspace {
return fmt.Errorf("Unknown keyspace")
}
ft.notifications <- srvKeyspace
return nil
}
func (ft *fakeTopo) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (<-chan *topodatapb.SrvKeyspace, chan<- struct{}, error) {
ft.callCount++
if keyspace == ft.keyspace {
return &topodatapb.SrvKeyspace{}, nil
ft.notifications = make(chan *topodatapb.SrvKeyspace, 10)
ft.stopWatching = make(chan struct{})
ft.notifications <- &topodatapb.SrvKeyspace{}
return ft.notifications, ft.stopWatching, nil
}
return nil, fmt.Errorf("Unknown keyspace")
return nil, nil, fmt.Errorf("Unknown keyspace")
}
func (ft *fakeTopo) GetSrvShard(ctx context.Context, cell, keyspace, shard string) (*topodatapb.SrvShard, error) {
ft.callCount++
if keyspace != ft.keyspace {
return nil, fmt.Errorf("Unknown keyspace")
}
return &topodatapb.SrvShard{
Name: shard,
}, nil
}
func (ft *fakeTopo) GetEndPoints(ctx context.Context, cell, keyspace, shard string, tabletType topodatapb.TabletType) (*topodatapb.EndPoints, int64, error) {
@ -292,11 +317,73 @@ func TestRemoteMaster(t *testing.T) {
}
}
// TestGetSrvKeyspace will test we properly return updated SrvKeyspace.
func TestGetSrvKeyspace(t *testing.T) {
ft := &fakeTopo{keyspace: "test_ks"}
rsts := NewResilientSrvTopoServer(topo.Server{Impl: ft}, "TestGetSrvKeyspace")
// ask for the known keyspace, that populates the cache
_, err := rsts.GetSrvKeyspace(context.Background(), "", "test_ks")
if err != nil {
t.Fatalf("GetSrvKeyspace got unexpected error: %v", err)
}
// update srvkeyspace with new value
want := &topodatapb.SrvKeyspace{
ShardingColumnName: "id",
ShardingColumnType: topodatapb.KeyspaceIdType_UINT64,
}
ft.UpdateSrvKeyspace(context.Background(), "", "test_ks", want)
var got *topodatapb.SrvKeyspace
expiry := time.Now().Add(5 * time.Second)
for i := time.Now(); i.Before(expiry); {
got, err = rsts.GetSrvKeyspace(context.Background(), "", "test_ks")
if err != nil {
t.Fatalf("GetSrvKeyspace got unexpected error: %v", err)
}
if proto.Equal(want, got) {
break
}
time.Sleep(10 * time.Millisecond)
}
if !proto.Equal(want, got) {
t.Fatalf("GetSrvKeyspace() = %+v, want %+v", got, want)
}
}
// TestCacheWithErrors will test we properly return cached errors.
func TestCacheWithErrors(t *testing.T) {
ft := &fakeTopo{keyspace: "test_ks"}
rsts := NewResilientSrvTopoServer(topo.Server{Impl: ft}, "TestCacheWithErrors")
// ask for the known keyspace, that populates the cache
_, err := rsts.GetSrvShard(context.Background(), "", "test_ks", "shard_0")
if err != nil {
t.Fatalf("GetSrvShard got unexpected error: %v", err)
}
// now make the topo server fail, and ask again, should get cached
// value, not even ask underlying guy
ft.keyspace = "another_test_ks"
_, err = rsts.GetSrvShard(context.Background(), "", "test_ks", "shard_0")
if err != nil {
t.Fatalf("GetSrvShard got unexpected error: %v", err)
}
// now reduce TTL to nothing, so we won't use cache, and ask again
rsts.cacheTTL = 0
_, err = rsts.GetSrvShard(context.Background(), "", "test_ks", "shard_0")
if err != nil {
t.Fatalf("GetSrvShard got unexpected error: %v", err)
}
}
// TestSrvKeyspaceCacheWithErrors will test we properly return cached errors for GetSrvKeyspace.
func TestSrvKeyspaceCacheWithErrors(t *testing.T) {
ft := &fakeTopo{keyspace: "test_ks"}
rsts := NewResilientSrvTopoServer(topo.Server{Impl: ft}, "TestSrvKeyspaceCacheWithErrors")
// ask for the known keyspace, that populates the cache
_, err := rsts.GetSrvKeyspace(context.Background(), "", "test_ks")
if err != nil {
@ -305,14 +392,7 @@ func TestCacheWithErrors(t *testing.T) {
// now make the topo server fail, and ask again, should get cached
// value, not even ask underlying guy
ft.keyspace = "another_test_ks"
_, err = rsts.GetSrvKeyspace(context.Background(), "", "test_ks")
if err != nil {
t.Fatalf("GetSrvKeyspace got unexpected error: %v", err)
}
// now reduce TTL to nothing, so we won't use cache, and ask again
rsts.cacheTTL = 0
close(ft.notifications)
_, err = rsts.GetSrvKeyspace(context.Background(), "", "test_ks")
if err != nil {
t.Fatalf("GetSrvKeyspace got unexpected error: %v", err)
@ -324,6 +404,40 @@ func TestCachedErrors(t *testing.T) {
ft := &fakeTopo{keyspace: "test_ks"}
rsts := NewResilientSrvTopoServer(topo.Server{Impl: ft}, "TestCachedErrors")
// ask for an unknown keyspace, should get an error
_, err := rsts.GetSrvShard(context.Background(), "", "unknown_ks", "shard_0")
if err == nil {
t.Fatalf("First GetSrvShard didn't return an error")
}
if ft.callCount != 1 {
t.Fatalf("GetSrvShard didn't get called 1 but %v times", ft.callCount)
}
// ask again, should get an error and use cache
_, err = rsts.GetSrvShard(context.Background(), "", "unknown_ks", "shard_0")
if err == nil {
t.Fatalf("Second GetSrvShard didn't return an error")
}
if ft.callCount != 1 {
t.Fatalf("GetSrvShard was called again: %v times", ft.callCount)
}
// ask again after expired cache, should get an error
rsts.cacheTTL = 0
_, err = rsts.GetSrvShard(context.Background(), "", "unknown_ks", "shard_0")
if err == nil {
t.Fatalf("Third GetSrvShard didn't return an error")
}
if ft.callCount != 2 {
t.Fatalf("GetSrvShard was not called again: %v times", ft.callCount)
}
}
// TestSrvKeyspaceCachedErrors will test we properly return cached errors for SrvKeyspace.
func TestSrvKeyspaceCachedErrors(t *testing.T) {
ft := &fakeTopo{keyspace: "test_ks"}
rsts := NewResilientSrvTopoServer(topo.Server{Impl: ft}, "TestSrvKeyspaceCachedErrors")
// ask for an unknown keyspace, should get an error
_, err := rsts.GetSrvKeyspace(context.Background(), "", "unknown_ks")
if err == nil {
@ -338,16 +452,6 @@ func TestCachedErrors(t *testing.T) {
if err == nil {
t.Fatalf("Second GetSrvKeyspace didn't return an error")
}
if ft.callCount != 1 {
t.Fatalf("GetSrvKeyspace was called again: %v times", ft.callCount)
}
// ask again after expired cache, should get an error
rsts.cacheTTL = 0
_, err = rsts.GetSrvKeyspace(context.Background(), "", "unknown_ks")
if err == nil {
t.Fatalf("Third GetSrvKeyspace didn't return an error")
}
if ft.callCount != 2 {
t.Fatalf("GetSrvKeyspace was not called again: %v times", ft.callCount)
}