From 3b8a3d5029d3f994770a8cc756c326a3230e0019 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Sun, 20 May 2018 21:34:40 -0700 Subject: [PATCH 1/5] improve the topology watcher tests Using the newly added counters, add verification that the various operation counts occur as expected. This required also adding calls to topo.FixShardReplication in the to avoid differences in the operation counts between the two types of topology watchers. Signed-off-by: Michael Demmer --- go/vt/discovery/topology_watcher_test.go | 104 ++++++++++++++++++++++- 1 file changed, 102 insertions(+), 2 deletions(-) diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 0dce5659f3..650285f801 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -22,10 +22,32 @@ import ( "github.com/golang/protobuf/proto" "golang.org/x/net/context" + "vitess.io/vitess/go/vt/logutil" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" ) +func checkOpCounts(t *testing.T, tw *TopologyWatcher, prevCounts, deltas map[string]int64) map[string]int64 { + t.Helper() + newCounts := topologyWatcherOperations.Counts() + for key, prevVal := range prevCounts { + delta, ok := deltas[key] + if !ok { + delta = 0 + } + newVal, ok := newCounts[key] + if !ok { + newVal = 0 + } + + if newVal != prevVal+delta { + t.Errorf("expected %v to increase by %v, got %v -> %v", key, delta, prevVal, newVal) + } + } + return newCounts +} + func TestCellTabletsWatcher(t *testing.T) { checkWatcher(t, true) } @@ -37,6 +59,9 @@ func TestShardReplicationWatcher(t *testing.T) { func checkWatcher(t *testing.T, cellTablets bool) { ts := memorytopo.NewServer("aa") fhc := NewFakeHealthCheck() + logger := logutil.NewMemoryLogger() + topologyWatcherOperations.ZeroAll() + counts := topologyWatcherOperations.Counts() var tw *TopologyWatcher if cellTablets { tw = NewCellTabletsWatcher(ts, fhc, "aa", 10*time.Minute, 5) @@ -50,6 +75,7 @@ func checkWatcher(t *testing.T, cellTablets bool) { if err := tw.WaitForInitialTopology(); err != nil { t.Fatalf("initial WaitForInitialTopology failed") } + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1}) // Add a tablet to the topology. tablet := &topodatapb.Tablet{ @@ -68,6 +94,7 @@ func checkWatcher(t *testing.T, cellTablets bool) { t.Fatalf("CreateTablet failed: %v", err) } tw.loadTablets() + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1}) // Check the tablet is returned by GetAllTablets(). allTablets := fhc.GetAllTablets() @@ -76,6 +103,32 @@ func checkWatcher(t *testing.T, cellTablets bool) { t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) } + // Add a second tablet to the topology. + tablet2 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "aa", + Uid: 2, + }, + Hostname: "host2", + PortMap: map[string]int32{ + "vt": 789, + }, + Keyspace: "keyspace", + Shard: "shard", + } + if err := ts.CreateTablet(context.Background(), tablet2); err != nil { + t.Fatalf("CreateTablet failed: %v", err) + } + tw.loadTablets() + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1}) + + // Check the tablet is returned by GetAllTablets(). + allTablets = fhc.GetAllTablets() + key = TabletToMapKey(tablet2) + if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet2) { + t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2) + } + // same tablet, different port, should update (previous // one should go away, new one be added). tablet.PortMap["vt"] = 456 @@ -86,9 +139,11 @@ func checkWatcher(t *testing.T, cellTablets bool) { t.Fatalf("UpdateTabletFields failed: %v", err) } tw.loadTablets() + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1, "RemoveTablet": 1}) + allTablets = fhc.GetAllTablets() key = TabletToMapKey(tablet) - if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet) { + if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet) { t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) } @@ -101,14 +156,59 @@ func checkWatcher(t *testing.T, cellTablets bool) { if err := ts.CreateTablet(context.Background(), tablet); err != nil { t.Fatalf("CreateTablet failed: %v", err) } + if err := topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard"); err != nil { + t.Fatalf("FixShardReplication failed: %v", err) + } tw.loadTablets() + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1}) + allTablets = fhc.GetAllTablets() key = TabletToMapKey(tablet) - if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet) { + if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet) { t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) } + // Remove and check that it is detected as being gone. + if err := ts.DeleteTablet(context.Background(), tablet.Alias); err != nil { + t.Fatalf("DeleteTablet failed: %v", err) + } + if err := topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard"); err != nil { + t.Fatalf("FixShardReplication failed: %v", err) + } + tw.loadTablets() + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "RemoveTablet": 1}) + + allTablets = fhc.GetAllTablets() + key = TabletToMapKey(tablet) + if _, ok := allTablets[key]; ok || len(allTablets) != 1 { + t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) + } + key = TabletToMapKey(tablet2) + if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet2) { + t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2) + } + + // Remove the other and check that it is detected as being gone. + if err := ts.DeleteTablet(context.Background(), tablet2.Alias); err != nil { + t.Fatalf("DeleteTablet failed: %v", err) + } + if err := topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard"); err != nil { + t.Fatalf("FixShardReplication failed: %v", err) + } + tw.loadTablets() + checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1}) + + allTablets = fhc.GetAllTablets() + key = TabletToMapKey(tablet) + if _, ok := allTablets[key]; ok || len(allTablets) != 0 { + t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) + } + key = TabletToMapKey(tablet2) + if _, ok := allTablets[key]; ok || len(allTablets) != 0 { + t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) + } + tw.Stop() } From 67a94906abe8692b50c9b001be0e944fe17a3d34 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Sun, 20 May 2018 21:41:06 -0700 Subject: [PATCH 2/5] add a counters.ZeroAll helper method for tests Unlike ResetAll, ZeroAll keeps all the same keys in the map but changes all the values to zero. Signed-off-by: Michael Demmer --- go/stats/counters.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/go/stats/counters.go b/go/stats/counters.go index bc22dd7a2c..07e7880d58 100644 --- a/go/stats/counters.go +++ b/go/stats/counters.go @@ -88,13 +88,22 @@ func (c *counters) Add(name string, value int64) { atomic.AddInt64(a, value) } -// ResetAll resets all counter values. +// ResetAll resets all counter values and clears all keys. func (c *counters) ResetAll() { c.mu.Lock() defer c.mu.Unlock() c.counts = make(map[string]*int64) } +// ZeroAll resets all counter values to zero +func (c *counters) ZeroAll() { + c.mu.Lock() + defer c.mu.Unlock() + for _, a := range c.counts { + atomic.StoreInt64(a, int64(0)) + } +} + // Reset resets a specific counter value to 0. func (c *counters) Reset(name string) { a := c.getValueAddr(name) From 48e7d3274fc4ca7da05ccf4fab9a1c9459112b14 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Sun, 20 May 2018 21:47:10 -0700 Subject: [PATCH 3/5] rework topology watcher to track tablets by alias not address key Instead of tracking all the tablets by the TabletToMapKey value, use the alias as the key to all the data structures used in the scan comparisons. This change mostly doesn't change the behavior at all, with one exception when a tablet with a known alias changes the value of its address key. Previously the watcher would call AddTablet, then RemoveTablet, now it explicitly calls ReplaceTablet, which has the same net effect and seems more correct. Signed-off-by: Michael Demmer --- go/vt/discovery/topology_watcher.go | 55 +++++++++++++++++------- go/vt/discovery/topology_watcher_test.go | 2 +- 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 020251d4d2..1755f5ab45 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -97,6 +97,7 @@ func NewShardReplicationWatcher(topoServer *topo.Server, tr TabletRecorder, cell // tabletInfo is used internally by the TopologyWatcher class type tabletInfo struct { alias string + key string tablet *topodatapb.Tablet } @@ -163,7 +164,9 @@ func (tw *TopologyWatcher) watch() { func (tw *TopologyWatcher) loadTablets() { var wg sync.WaitGroup newTablets := make(map[string]*tabletInfo) - tabletAlias, err := tw.getTablets(tw) + replacedTablets := make(map[string]*tabletInfo) + + tabletAliases, err := tw.getTablets(tw) topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1) if err != nil { topologyWatcherErrors.Add(topologyWatcherOpListTablets, 1) @@ -175,7 +178,7 @@ func (tw *TopologyWatcher) loadTablets() { log.Errorf("cannot get tablets for cell: %v: %v", tw.cell, err) return } - for _, tAlias := range tabletAlias { + for _, tAlias := range tabletAliases { wg.Add(1) go func(alias *topodatapb.TabletAlias) { defer wg.Done() @@ -193,10 +196,11 @@ func (tw *TopologyWatcher) loadTablets() { log.Errorf("cannot get tablet for alias %v: %v", alias, err) return } - key := TabletToMapKey(tablet.Tablet) tw.mu.Lock() - newTablets[key] = &tabletInfo{ - alias: topoproto.TabletAliasString(alias), + aliasStr := topoproto.TabletAliasString(alias) + newTablets[aliasStr] = &tabletInfo{ + alias: aliasStr, + key: TabletToMapKey(tablet.Tablet), tablet: tablet.Tablet, } tw.mu.Unlock() @@ -205,20 +209,41 @@ func (tw *TopologyWatcher) loadTablets() { wg.Wait() tw.mu.Lock() - for key, tep := range newTablets { - if val, ok := tw.tablets[key]; !ok { - tw.tr.AddTablet(tep.tablet, tep.alias) - topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1) - } else if val.alias != tep.alias { - tw.tr.ReplaceTablet(val.tablet, tep.tablet, tep.alias) + for alias, newVal := range newTablets { + if val, ok := tw.tablets[alias]; !ok { + // Check if there's a tablet with the same address key but a + // different alias. If so, replace it and keep track of the + // replaced alias to make sure it isn't removed later. + found := false + for _, otherVal := range tw.tablets { + if newVal.key == otherVal.key { + found = true + tw.tr.ReplaceTablet(otherVal.tablet, newVal.tablet, alias) + topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1) + replacedTablets[otherVal.alias] = newVal + } + } + if !found { + tw.tr.AddTablet(newVal.tablet, alias) + topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1) + } + + } else if val.key != newVal.key { + // Handle the case where the same tablet alias is now reporting + // a different address key. + replacedTablets[alias] = newVal + tw.tr.ReplaceTablet(val.tablet, newVal.tablet, alias) topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1) } } - for key, tep := range tw.tablets { - if _, ok := newTablets[key]; !ok { - tw.tr.RemoveTablet(tep.tablet) - topologyWatcherOperations.Add(topologyWatcherOpRemoveTablet, 1) + + for _, val := range tw.tablets { + if _, ok := newTablets[val.alias]; !ok { + if _, ok2 := replacedTablets[val.alias]; !ok2 { + tw.tr.RemoveTablet(val.tablet) + topologyWatcherOperations.Add(topologyWatcherOpRemoveTablet, 1) + } } } tw.tablets = newTablets diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 650285f801..89057a9e6f 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -139,7 +139,7 @@ func checkWatcher(t *testing.T, cellTablets bool) { t.Fatalf("UpdateTabletFields failed: %v", err) } tw.loadTablets() - counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1, "RemoveTablet": 1}) + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1}) allTablets = fhc.GetAllTablets() key = TabletToMapKey(tablet) From 7104fbdc620050265e8410955f0528f76033349b Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Tue, 22 May 2018 08:28:53 -0700 Subject: [PATCH 4/5] add a topology watcher option for refreshing known tablets Add a refreshKnownTablets option for the TopologyWatcher and a corresponding flag in discovery gateway. The default behavior is unchanged which means that each vtgate will periodically re-read the TabletInfo record for each tablet in case the address/port map changes. However the new flag can disable these queries for environments in which the association between a tablet alias and the host/port map never changes. This greatly reduces the load on the topo service since most of the k/v requests are for refreshing the TabletInfo and there's no efficient way to watch for this data. Signed-off-by: Michael Demmer --- go/vt/discovery/topology_watcher.go | 51 +++++++----- go/vt/discovery/topology_watcher_test.go | 101 +++++++++++++++++------ go/vt/vtgate/gateway/discoverygateway.go | 3 +- 3 files changed, 111 insertions(+), 44 deletions(-) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 1755f5ab45..67ca271d5e 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -65,8 +65,8 @@ type TabletRecorder interface { // NewCellTabletsWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and starts refreshing. -func NewCellTabletsWatcher(topoServer *topo.Server, tr TabletRecorder, cell string, refreshInterval time.Duration, topoReadConcurrency int) *TopologyWatcher { - return NewTopologyWatcher(topoServer, tr, cell, refreshInterval, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) { +func NewCellTabletsWatcher(topoServer *topo.Server, tr TabletRecorder, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { + return NewTopologyWatcher(topoServer, tr, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) { return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell) }) } @@ -74,7 +74,7 @@ func NewCellTabletsWatcher(topoServer *topo.Server, tr TabletRecorder, cell stri // NewShardReplicationWatcher returns a TopologyWatcher that // monitors the tablets in a cell/keyspace/shard, and starts refreshing. func NewShardReplicationWatcher(topoServer *topo.Server, tr TabletRecorder, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) *TopologyWatcher { - return NewTopologyWatcher(topoServer, tr, cell, refreshInterval, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) { + return NewTopologyWatcher(topoServer, tr, cell, refreshInterval, true /* refreshKnownTablets */, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) { sri, err := tw.topoServer.GetShardReplication(tw.ctx, tw.cell, keyspace, shard) switch err { case nil: @@ -106,14 +106,15 @@ type tabletInfo struct { // the TabletRecorder AddTablet / RemoveTablet interface appropriately. type TopologyWatcher struct { // set at construction time - topoServer *topo.Server - tr TabletRecorder - cell string - refreshInterval time.Duration - getTablets func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) - sem chan int - ctx context.Context - cancelFunc context.CancelFunc + topoServer *topo.Server + tr TabletRecorder + cell string + refreshInterval time.Duration + refreshKnownTablets bool + getTablets func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) + sem chan int + ctx context.Context + cancelFunc context.CancelFunc // wg keeps track of all launched Go routines. wg sync.WaitGroup @@ -128,15 +129,16 @@ type TopologyWatcher struct { // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and starts refreshing. -func NewTopologyWatcher(topoServer *topo.Server, tr TabletRecorder, cell string, refreshInterval time.Duration, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error)) *TopologyWatcher { +func NewTopologyWatcher(topoServer *topo.Server, tr TabletRecorder, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error)) *TopologyWatcher { tw := &TopologyWatcher{ - topoServer: topoServer, - tr: tr, - cell: cell, - refreshInterval: refreshInterval, - getTablets: getTablets, - sem: make(chan int, topoReadConcurrency), - tablets: make(map[string]*tabletInfo), + topoServer: topoServer, + tr: tr, + cell: cell, + refreshInterval: refreshInterval, + refreshKnownTablets: refreshKnownTablets, + getTablets: getTablets, + sem: make(chan int, topoReadConcurrency), + tablets: make(map[string]*tabletInfo), } tw.firstLoadChan = make(chan struct{}) tw.ctx, tw.cancelFunc = context.WithCancel(context.Background()) @@ -178,7 +180,17 @@ func (tw *TopologyWatcher) loadTablets() { log.Errorf("cannot get tablets for cell: %v: %v", tw.cell, err) return } + + tw.mu.Lock() for _, tAlias := range tabletAliases { + if !tw.refreshKnownTablets { + aliasStr := topoproto.TabletAliasString(tAlias) + if val, ok := tw.tablets[aliasStr]; ok { + newTablets[aliasStr] = val + continue + } + } + wg.Add(1) go func(alias *topodatapb.TabletAlias) { defer wg.Done() @@ -207,6 +219,7 @@ func (tw *TopologyWatcher) loadTablets() { }(tAlias) } + tw.mu.Unlock() wg.Wait() tw.mu.Lock() diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 89057a9e6f..56ae63d727 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -49,14 +49,18 @@ func checkOpCounts(t *testing.T, tw *TopologyWatcher, prevCounts, deltas map[str } func TestCellTabletsWatcher(t *testing.T) { - checkWatcher(t, true) + checkWatcher(t, true, true) +} + +func TestCellTabletsWatcherNoRefreshKnown(t *testing.T) { + checkWatcher(t, true, false) } func TestShardReplicationWatcher(t *testing.T) { - checkWatcher(t, false) + checkWatcher(t, false, true) } -func checkWatcher(t *testing.T, cellTablets bool) { +func checkWatcher(t *testing.T, cellTablets, refreshKnownTablets bool) { ts := memorytopo.NewServer("aa") fhc := NewFakeHealthCheck() logger := logutil.NewMemoryLogger() @@ -64,7 +68,7 @@ func checkWatcher(t *testing.T, cellTablets bool) { counts := topologyWatcherOperations.Counts() var tw *TopologyWatcher if cellTablets { - tw = NewCellTabletsWatcher(ts, fhc, "aa", 10*time.Minute, 5) + tw = NewCellTabletsWatcher(ts, fhc, "aa", 10*time.Minute, refreshKnownTablets, 5) } else { tw = NewShardReplicationWatcher(ts, fhc, "aa", "keyspace", "shard", 10*time.Minute, 5) } @@ -120,17 +124,39 @@ func checkWatcher(t *testing.T, cellTablets bool) { t.Fatalf("CreateTablet failed: %v", err) } tw.loadTablets() - counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1}) - // Check the tablet is returned by GetAllTablets(). + // If refreshKnownTablets is disabled, only the new tablet is read + // from the topo + if refreshKnownTablets { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1}) + } else { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1}) + } + + // Check the new tablet is returned by GetAllTablets(). allTablets = fhc.GetAllTablets() key = TabletToMapKey(tablet2) if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet2) { t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2) } + // Load the tablets again to show that when refreshKnownTablets is disabled, + // only the list is read from the topo + tw.loadTablets() + if refreshKnownTablets { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2}) + } else { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1}) + } + // same tablet, different port, should update (previous - // one should go away, new one be added). + // one should go away, new one be added) + // + // if refreshKnownTablets is disabled, this case is *not* + // detected and the tablet remains in the topo using the + // old key + origTablet := proto.Clone(tablet).(*topodatapb.Tablet) + origKey := TabletToMapKey(tablet) tablet.PortMap["vt"] = 456 if _, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { t.PortMap["vt"] = 456 @@ -139,37 +165,60 @@ func checkWatcher(t *testing.T, cellTablets bool) { t.Fatalf("UpdateTabletFields failed: %v", err) } tw.loadTablets() - counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1}) - allTablets = fhc.GetAllTablets() key = TabletToMapKey(tablet) - if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) + + if refreshKnownTablets { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1}) + + if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet) { + t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) + } + if _, ok := allTablets[origKey]; ok { + t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, origKey) + } + } else { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1}) + + if _, ok := allTablets[origKey]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[origKey], origTablet) { + t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, origTablet) + } + if _, ok := allTablets[key]; ok { + t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) + } } - // Remove and re-add with a new uid. This should trigger a ReplaceTablet in loadTablets, - // because the uid does not match. - if err := ts.DeleteTablet(context.Background(), tablet.Alias); err != nil { + // Remove the second tablet and re-add with a new uid. This should + // trigger a ReplaceTablet in loadTablets because the uid does not + // match. + // + // This case *is* detected even if refreshKnownTablets is false + // because the delete tablet / create tablet sequence causes the + // list of tablets to change and therefore the change is detected. + if err := ts.DeleteTablet(context.Background(), tablet2.Alias); err != nil { t.Fatalf("DeleteTablet failed: %v", err) } - tablet.Alias.Uid = 1 - if err := ts.CreateTablet(context.Background(), tablet); err != nil { + tablet2.Alias.Uid = 3 + if err := ts.CreateTablet(context.Background(), tablet2); err != nil { t.Fatalf("CreateTablet failed: %v", err) } if err := topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard"); err != nil { t.Fatalf("FixShardReplication failed: %v", err) } tw.loadTablets() - - counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1}) - allTablets = fhc.GetAllTablets() - key = TabletToMapKey(tablet) - if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) + + if refreshKnownTablets { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1}) + } else { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "ReplaceTablet": 1}) + } + key = TabletToMapKey(tablet2) + if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet2) { + t.Errorf("fhc.GetAllTablets() = %+v; want %v => %+v", allTablets, key, tablet2) } - // Remove and check that it is detected as being gone. + // Remove the tablet and check that it is detected as being gone. if err := ts.DeleteTablet(context.Background(), tablet.Alias); err != nil { t.Fatalf("DeleteTablet failed: %v", err) } @@ -177,7 +226,11 @@ func checkWatcher(t *testing.T, cellTablets bool) { t.Fatalf("FixShardReplication failed: %v", err) } tw.loadTablets() - counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "RemoveTablet": 1}) + if refreshKnownTablets { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "RemoveTablet": 1}) + } else { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "RemoveTablet": 1}) + } allTablets = fhc.GetAllTablets() key = TabletToMapKey(tablet) diff --git a/go/vt/vtgate/gateway/discoverygateway.go b/go/vt/vtgate/gateway/discoverygateway.go index d870dccc45..40a8c15d15 100644 --- a/go/vt/vtgate/gateway/discoverygateway.go +++ b/go/vt/vtgate/gateway/discoverygateway.go @@ -46,6 +46,7 @@ var ( cellsToWatch = flag.String("cells_to_watch", "", "comma-separated list of cells for watching tablets") tabletFilters flagutil.StringListValue refreshInterval = flag.Duration("tablet_refresh_interval", 1*time.Minute, "tablet refresh interval") + refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes") topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads") allowedTabletTypes []topodatapb.TabletType ) @@ -116,7 +117,7 @@ func createDiscoveryGateway(hc discovery.HealthCheck, serv srvtopo.Server, cell tr = fbs } - ctw := discovery.NewCellTabletsWatcher(topoServer, tr, c, *refreshInterval, *topoReadConcurrency) + ctw := discovery.NewCellTabletsWatcher(topoServer, tr, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency) dg.tabletsWatchers = append(dg.tabletsWatchers, ctw) } dg.QueryService = queryservice.Wrap(nil, dg.withRetry) From c7ed4f15d40327bc768c362af55d45dd829679c4 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Tue, 22 May 2018 08:57:28 -0700 Subject: [PATCH 5/5] pass the refreshKnownTablets option in newRealtimeStats Signed-off-by: Michael Demmer --- go/vt/vtctld/realtime_status.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtctld/realtime_status.go b/go/vt/vtctld/realtime_status.go index bfbdfb824a..bff54db1fe 100644 --- a/go/vt/vtctld/realtime_status.go +++ b/go/vt/vtctld/realtime_status.go @@ -51,7 +51,7 @@ func newRealtimeStats(ts *topo.Server) (*realtimeStats, error) { } var watchers []*discovery.TopologyWatcher for _, cell := range cells { - watcher := discovery.NewCellTabletsWatcher(ts, hc, cell, *vtctl.HealthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency) + watcher := discovery.NewCellTabletsWatcher(ts, hc, cell, *vtctl.HealthCheckTopologyRefresh, true /* refreshKnownTablets */, discovery.DefaultTopoReadConcurrency) watchers = append(watchers, watcher) } r.cellWatchers = watchers