Merge pull request #3965 from tinyspeck/topology-watcher-avoid-recheck

Topology watcher refreshKnownTablets option
This commit is contained in:
Michael Demmer 2018-05-29 09:43:57 -07:00 коммит произвёл GitHub
Родитель 0c36862704 c7ed4f15d4
Коммит 6b81f05657
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 252 добавлений и 51 удалений

Просмотреть файл

@ -88,13 +88,22 @@ func (c *counters) Add(name string, value int64) {
atomic.AddInt64(a, value)
}
// ResetAll resets all counter values.
// ResetAll resets all counter values and clears all keys.
func (c *counters) ResetAll() {
c.mu.Lock()
defer c.mu.Unlock()
c.counts = make(map[string]*int64)
}
// ZeroAll resets all counter values to zero
func (c *counters) ZeroAll() {
c.mu.Lock()
defer c.mu.Unlock()
for _, a := range c.counts {
atomic.StoreInt64(a, int64(0))
}
}
// Reset resets a specific counter value to 0.
func (c *counters) Reset(name string) {
a := c.getValueAddr(name)

Просмотреть файл

@ -65,8 +65,8 @@ type TabletRecorder interface {
// NewCellTabletsWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and starts refreshing.
func NewCellTabletsWatcher(topoServer *topo.Server, tr TabletRecorder, cell string, refreshInterval time.Duration, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(topoServer, tr, cell, refreshInterval, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) {
func NewCellTabletsWatcher(topoServer *topo.Server, tr TabletRecorder, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(topoServer, tr, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) {
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell)
})
}
@ -74,7 +74,7 @@ func NewCellTabletsWatcher(topoServer *topo.Server, tr TabletRecorder, cell stri
// NewShardReplicationWatcher returns a TopologyWatcher that
// monitors the tablets in a cell/keyspace/shard, and starts refreshing.
func NewShardReplicationWatcher(topoServer *topo.Server, tr TabletRecorder, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(topoServer, tr, cell, refreshInterval, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) {
return NewTopologyWatcher(topoServer, tr, cell, refreshInterval, true /* refreshKnownTablets */, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) {
sri, err := tw.topoServer.GetShardReplication(tw.ctx, tw.cell, keyspace, shard)
switch err {
case nil:
@ -97,6 +97,7 @@ func NewShardReplicationWatcher(topoServer *topo.Server, tr TabletRecorder, cell
// tabletInfo is used internally by the TopologyWatcher class
type tabletInfo struct {
alias string
key string
tablet *topodatapb.Tablet
}
@ -105,14 +106,15 @@ type tabletInfo struct {
// the TabletRecorder AddTablet / RemoveTablet interface appropriately.
type TopologyWatcher struct {
// set at construction time
topoServer *topo.Server
tr TabletRecorder
cell string
refreshInterval time.Duration
getTablets func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error)
sem chan int
ctx context.Context
cancelFunc context.CancelFunc
topoServer *topo.Server
tr TabletRecorder
cell string
refreshInterval time.Duration
refreshKnownTablets bool
getTablets func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error)
sem chan int
ctx context.Context
cancelFunc context.CancelFunc
// wg keeps track of all launched Go routines.
wg sync.WaitGroup
@ -127,15 +129,16 @@ type TopologyWatcher struct {
// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and starts refreshing.
func NewTopologyWatcher(topoServer *topo.Server, tr TabletRecorder, cell string, refreshInterval time.Duration, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error)) *TopologyWatcher {
func NewTopologyWatcher(topoServer *topo.Server, tr TabletRecorder, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error)) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
tr: tr,
cell: cell,
refreshInterval: refreshInterval,
getTablets: getTablets,
sem: make(chan int, topoReadConcurrency),
tablets: make(map[string]*tabletInfo),
topoServer: topoServer,
tr: tr,
cell: cell,
refreshInterval: refreshInterval,
refreshKnownTablets: refreshKnownTablets,
getTablets: getTablets,
sem: make(chan int, topoReadConcurrency),
tablets: make(map[string]*tabletInfo),
}
tw.firstLoadChan = make(chan struct{})
tw.ctx, tw.cancelFunc = context.WithCancel(context.Background())
@ -163,7 +166,9 @@ func (tw *TopologyWatcher) watch() {
func (tw *TopologyWatcher) loadTablets() {
var wg sync.WaitGroup
newTablets := make(map[string]*tabletInfo)
tabletAlias, err := tw.getTablets(tw)
replacedTablets := make(map[string]*tabletInfo)
tabletAliases, err := tw.getTablets(tw)
topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1)
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpListTablets, 1)
@ -175,7 +180,17 @@ func (tw *TopologyWatcher) loadTablets() {
log.Errorf("cannot get tablets for cell: %v: %v", tw.cell, err)
return
}
for _, tAlias := range tabletAlias {
tw.mu.Lock()
for _, tAlias := range tabletAliases {
if !tw.refreshKnownTablets {
aliasStr := topoproto.TabletAliasString(tAlias)
if val, ok := tw.tablets[aliasStr]; ok {
newTablets[aliasStr] = val
continue
}
}
wg.Add(1)
go func(alias *topodatapb.TabletAlias) {
defer wg.Done()
@ -193,32 +208,55 @@ func (tw *TopologyWatcher) loadTablets() {
log.Errorf("cannot get tablet for alias %v: %v", alias, err)
return
}
key := TabletToMapKey(tablet.Tablet)
tw.mu.Lock()
newTablets[key] = &tabletInfo{
alias: topoproto.TabletAliasString(alias),
aliasStr := topoproto.TabletAliasString(alias)
newTablets[aliasStr] = &tabletInfo{
alias: aliasStr,
key: TabletToMapKey(tablet.Tablet),
tablet: tablet.Tablet,
}
tw.mu.Unlock()
}(tAlias)
}
tw.mu.Unlock()
wg.Wait()
tw.mu.Lock()
for key, tep := range newTablets {
if val, ok := tw.tablets[key]; !ok {
tw.tr.AddTablet(tep.tablet, tep.alias)
topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1)
} else if val.alias != tep.alias {
tw.tr.ReplaceTablet(val.tablet, tep.tablet, tep.alias)
for alias, newVal := range newTablets {
if val, ok := tw.tablets[alias]; !ok {
// Check if there's a tablet with the same address key but a
// different alias. If so, replace it and keep track of the
// replaced alias to make sure it isn't removed later.
found := false
for _, otherVal := range tw.tablets {
if newVal.key == otherVal.key {
found = true
tw.tr.ReplaceTablet(otherVal.tablet, newVal.tablet, alias)
topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1)
replacedTablets[otherVal.alias] = newVal
}
}
if !found {
tw.tr.AddTablet(newVal.tablet, alias)
topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1)
}
} else if val.key != newVal.key {
// Handle the case where the same tablet alias is now reporting
// a different address key.
replacedTablets[alias] = newVal
tw.tr.ReplaceTablet(val.tablet, newVal.tablet, alias)
topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1)
}
}
for key, tep := range tw.tablets {
if _, ok := newTablets[key]; !ok {
tw.tr.RemoveTablet(tep.tablet)
topologyWatcherOperations.Add(topologyWatcherOpRemoveTablet, 1)
for _, val := range tw.tablets {
if _, ok := newTablets[val.alias]; !ok {
if _, ok2 := replacedTablets[val.alias]; !ok2 {
tw.tr.RemoveTablet(val.tablet)
topologyWatcherOperations.Add(topologyWatcherOpRemoveTablet, 1)
}
}
}
tw.tablets = newTablets

Просмотреть файл

@ -22,24 +22,53 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"vitess.io/vitess/go/vt/logutil"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
)
func checkOpCounts(t *testing.T, tw *TopologyWatcher, prevCounts, deltas map[string]int64) map[string]int64 {
t.Helper()
newCounts := topologyWatcherOperations.Counts()
for key, prevVal := range prevCounts {
delta, ok := deltas[key]
if !ok {
delta = 0
}
newVal, ok := newCounts[key]
if !ok {
newVal = 0
}
if newVal != prevVal+delta {
t.Errorf("expected %v to increase by %v, got %v -> %v", key, delta, prevVal, newVal)
}
}
return newCounts
}
func TestCellTabletsWatcher(t *testing.T) {
checkWatcher(t, true)
checkWatcher(t, true, true)
}
func TestCellTabletsWatcherNoRefreshKnown(t *testing.T) {
checkWatcher(t, true, false)
}
func TestShardReplicationWatcher(t *testing.T) {
checkWatcher(t, false)
checkWatcher(t, false, true)
}
func checkWatcher(t *testing.T, cellTablets bool) {
func checkWatcher(t *testing.T, cellTablets, refreshKnownTablets bool) {
ts := memorytopo.NewServer("aa")
fhc := NewFakeHealthCheck()
logger := logutil.NewMemoryLogger()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
var tw *TopologyWatcher
if cellTablets {
tw = NewCellTabletsWatcher(ts, fhc, "aa", 10*time.Minute, 5)
tw = NewCellTabletsWatcher(ts, fhc, "aa", 10*time.Minute, refreshKnownTablets, 5)
} else {
tw = NewShardReplicationWatcher(ts, fhc, "aa", "keyspace", "shard", 10*time.Minute, 5)
}
@ -50,6 +79,7 @@ func checkWatcher(t *testing.T, cellTablets bool) {
if err := tw.WaitForInitialTopology(); err != nil {
t.Fatalf("initial WaitForInitialTopology failed")
}
counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1})
// Add a tablet to the topology.
tablet := &topodatapb.Tablet{
@ -68,6 +98,7 @@ func checkWatcher(t *testing.T, cellTablets bool) {
t.Fatalf("CreateTablet failed: %v", err)
}
tw.loadTablets()
counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1})
// Check the tablet is returned by GetAllTablets().
allTablets := fhc.GetAllTablets()
@ -76,8 +107,56 @@ func checkWatcher(t *testing.T, cellTablets bool) {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet)
}
// Add a second tablet to the topology.
tablet2 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 2,
},
Hostname: "host2",
PortMap: map[string]int32{
"vt": 789,
},
Keyspace: "keyspace",
Shard: "shard",
}
if err := ts.CreateTablet(context.Background(), tablet2); err != nil {
t.Fatalf("CreateTablet failed: %v", err)
}
tw.loadTablets()
// If refreshKnownTablets is disabled, only the new tablet is read
// from the topo
if refreshKnownTablets {
counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1})
} else {
counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1})
}
// Check the new tablet is returned by GetAllTablets().
allTablets = fhc.GetAllTablets()
key = TabletToMapKey(tablet2)
if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet2) {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2)
}
// Load the tablets again to show that when refreshKnownTablets is disabled,
// only the list is read from the topo
tw.loadTablets()
if refreshKnownTablets {
counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2})
} else {
counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1})
}
// same tablet, different port, should update (previous
// one should go away, new one be added).
// one should go away, new one be added)
//
// if refreshKnownTablets is disabled, this case is *not*
// detected and the tablet remains in the topo using the
// old key
origTablet := proto.Clone(tablet).(*topodatapb.Tablet)
origKey := TabletToMapKey(tablet)
tablet.PortMap["vt"] = 456
if _, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error {
t.PortMap["vt"] = 456
@ -88,25 +167,99 @@ func checkWatcher(t *testing.T, cellTablets bool) {
tw.loadTablets()
allTablets = fhc.GetAllTablets()
key = TabletToMapKey(tablet)
if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet) {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet)
if refreshKnownTablets {
counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1})
if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet) {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet)
}
if _, ok := allTablets[origKey]; ok {
t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, origKey)
}
} else {
counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1})
if _, ok := allTablets[origKey]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[origKey], origTablet) {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, origTablet)
}
if _, ok := allTablets[key]; ok {
t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key)
}
}
// Remove and re-add with a new uid. This should trigger a ReplaceTablet in loadTablets,
// because the uid does not match.
// Remove the second tablet and re-add with a new uid. This should
// trigger a ReplaceTablet in loadTablets because the uid does not
// match.
//
// This case *is* detected even if refreshKnownTablets is false
// because the delete tablet / create tablet sequence causes the
// list of tablets to change and therefore the change is detected.
if err := ts.DeleteTablet(context.Background(), tablet2.Alias); err != nil {
t.Fatalf("DeleteTablet failed: %v", err)
}
tablet2.Alias.Uid = 3
if err := ts.CreateTablet(context.Background(), tablet2); err != nil {
t.Fatalf("CreateTablet failed: %v", err)
}
if err := topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard"); err != nil {
t.Fatalf("FixShardReplication failed: %v", err)
}
tw.loadTablets()
allTablets = fhc.GetAllTablets()
if refreshKnownTablets {
counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1})
} else {
counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "ReplaceTablet": 1})
}
key = TabletToMapKey(tablet2)
if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet2) {
t.Errorf("fhc.GetAllTablets() = %+v; want %v => %+v", allTablets, key, tablet2)
}
// Remove the tablet and check that it is detected as being gone.
if err := ts.DeleteTablet(context.Background(), tablet.Alias); err != nil {
t.Fatalf("DeleteTablet failed: %v", err)
}
tablet.Alias.Uid = 1
if err := ts.CreateTablet(context.Background(), tablet); err != nil {
t.Fatalf("CreateTablet failed: %v", err)
if err := topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard"); err != nil {
t.Fatalf("FixShardReplication failed: %v", err)
}
tw.loadTablets()
if refreshKnownTablets {
counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "RemoveTablet": 1})
} else {
counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "RemoveTablet": 1})
}
allTablets = fhc.GetAllTablets()
key = TabletToMapKey(tablet)
if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet) {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet)
if _, ok := allTablets[key]; ok || len(allTablets) != 1 {
t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key)
}
key = TabletToMapKey(tablet2)
if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet2) {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2)
}
// Remove the other and check that it is detected as being gone.
if err := ts.DeleteTablet(context.Background(), tablet2.Alias); err != nil {
t.Fatalf("DeleteTablet failed: %v", err)
}
if err := topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard"); err != nil {
t.Fatalf("FixShardReplication failed: %v", err)
}
tw.loadTablets()
checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1})
allTablets = fhc.GetAllTablets()
key = TabletToMapKey(tablet)
if _, ok := allTablets[key]; ok || len(allTablets) != 0 {
t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key)
}
key = TabletToMapKey(tablet2)
if _, ok := allTablets[key]; ok || len(allTablets) != 0 {
t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key)
}
tw.Stop()

Просмотреть файл

@ -51,7 +51,7 @@ func newRealtimeStats(ts *topo.Server) (*realtimeStats, error) {
}
var watchers []*discovery.TopologyWatcher
for _, cell := range cells {
watcher := discovery.NewCellTabletsWatcher(ts, hc, cell, *vtctl.HealthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency)
watcher := discovery.NewCellTabletsWatcher(ts, hc, cell, *vtctl.HealthCheckTopologyRefresh, true /* refreshKnownTablets */, discovery.DefaultTopoReadConcurrency)
watchers = append(watchers, watcher)
}
r.cellWatchers = watchers

Просмотреть файл

@ -46,6 +46,7 @@ var (
cellsToWatch = flag.String("cells_to_watch", "", "comma-separated list of cells for watching tablets")
tabletFilters flagutil.StringListValue
refreshInterval = flag.Duration("tablet_refresh_interval", 1*time.Minute, "tablet refresh interval")
refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes")
topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads")
allowedTabletTypes []topodatapb.TabletType
)
@ -116,7 +117,7 @@ func createDiscoveryGateway(hc discovery.HealthCheck, serv srvtopo.Server, cell
tr = fbs
}
ctw := discovery.NewCellTabletsWatcher(topoServer, tr, c, *refreshInterval, *topoReadConcurrency)
ctw := discovery.NewCellTabletsWatcher(topoServer, tr, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency)
dg.tabletsWatchers = append(dg.tabletsWatchers, ctw)
}
dg.QueryService = queryservice.Wrap(nil, dg.withRetry)