зеркало из https://github.com/github/vitess-gh.git
Bug fix: Cache filtered out tablets in topology watcher to avoid unnecessary GetTablet calls to topo (#12194) (#12244)
* check filter later in loadTablets. Add tests to confirm expected behavior * remove unnecessary assignment * add some explanation around TestFilterByKeypsaceSkipsIgnoredTablets * simplify filter check logic as well as major test cleanup, using testify where possible --------- Signed-off-by: Brian Ramos <brirams@users.noreply.github.com> Co-authored-by: Brian Ramos <brirams@users.noreply.github.com>
This commit is contained in:
Родитель
1a950cd86f
Коммит
c35032feee
|
@ -199,9 +199,6 @@ func (tw *TopologyWatcher) loadTablets() {
|
|||
log.Errorf("cannot get tablet for alias %v: %v", alias, err)
|
||||
return
|
||||
}
|
||||
if !(tw.tabletFilter == nil || tw.tabletFilter.IsIncluded(tablet.Tablet)) {
|
||||
return
|
||||
}
|
||||
tw.mu.Lock()
|
||||
aliasStr := topoproto.TabletAliasString(alias)
|
||||
newTablets[aliasStr] = &tabletInfo{
|
||||
|
@ -217,6 +214,10 @@ func (tw *TopologyWatcher) loadTablets() {
|
|||
tw.mu.Lock()
|
||||
|
||||
for alias, newVal := range newTablets {
|
||||
if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) {
|
||||
continue
|
||||
}
|
||||
|
||||
// trust the alias from topo and add it if it doesn't exist
|
||||
if val, ok := tw.tablets[alias]; ok {
|
||||
// check if the host and port have changed. If yes, replace tablet.
|
||||
|
@ -236,6 +237,10 @@ func (tw *TopologyWatcher) loadTablets() {
|
|||
}
|
||||
|
||||
for _, val := range tw.tablets {
|
||||
if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(val.tablet) {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := newTablets[val.alias]; !ok {
|
||||
tw.healthcheck.RemoveTablet(val.tablet)
|
||||
topologyWatcherOperations.Add(topologyWatcherOpRemoveTablet, 1)
|
||||
|
|
|
@ -17,12 +17,13 @@ limitations under the License.
|
|||
package discovery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"context"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"vitess.io/vitess/go/vt/logutil"
|
||||
|
@ -44,19 +45,14 @@ func checkOpCounts(t *testing.T, prevCounts, deltas map[string]int64) map[string
|
|||
newVal = 0
|
||||
}
|
||||
|
||||
if newVal != prevVal+delta {
|
||||
t.Errorf("expected %v to increase by %v, got %v -> %v", key, delta, prevVal, newVal)
|
||||
}
|
||||
assert.Equal(t, newVal, prevVal+delta, "expected %v to increase by %v, got %v -> %v", key, delta, prevVal, newVal)
|
||||
}
|
||||
return newCounts
|
||||
}
|
||||
|
||||
func checkChecksum(t *testing.T, tw *TopologyWatcher, want uint32) {
|
||||
t.Helper()
|
||||
got := tw.TopoChecksum()
|
||||
if want != got {
|
||||
t.Errorf("want checksum %v got %v", want, got)
|
||||
}
|
||||
assert.Equal(t, want, tw.TopoChecksum())
|
||||
}
|
||||
|
||||
func TestStartAndCloseTopoWatcher(t *testing.T) {
|
||||
|
@ -506,3 +502,115 @@ func TestFilterByKeyspace(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestFilterByKeypsaceSkipsIgnoredTablets confirms a bug fix for the case when a TopologyWatcher
|
||||
// has a FilterByKeyspace TabletFilter configured along with refreshKnownTablets turned off. We want
|
||||
// to ensure that the TopologyWatcher:
|
||||
// - does not continuosly call GetTablets for tablets that do not satisfy the filter
|
||||
// - does not add or remove these filtered out tablets from the its healtcheck
|
||||
func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) {
|
||||
ts := memorytopo.NewServer("aa")
|
||||
fhc := NewFakeHealthCheck(nil)
|
||||
topologyWatcherOperations.ZeroAll()
|
||||
counts := topologyWatcherOperations.Counts()
|
||||
f := NewFilterByKeyspace(testKeyspacesToWatch)
|
||||
tw := NewCellTabletsWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5)
|
||||
|
||||
counts = checkOpCounts(t, counts, map[string]int64{})
|
||||
checkChecksum(t, tw, 0)
|
||||
|
||||
// Add a tablet from a tracked keyspace to the topology.
|
||||
tablet := &topodatapb.Tablet{
|
||||
Alias: &topodatapb.TabletAlias{
|
||||
Cell: "aa",
|
||||
Uid: 0,
|
||||
},
|
||||
Hostname: "host1",
|
||||
PortMap: map[string]int32{
|
||||
"vt": 123,
|
||||
},
|
||||
Keyspace: "ks1",
|
||||
Shard: "shard",
|
||||
}
|
||||
require.NoError(t, ts.CreateTablet(context.Background(), tablet))
|
||||
|
||||
tw.loadTablets()
|
||||
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1})
|
||||
checkChecksum(t, tw, 3238442862)
|
||||
|
||||
// Check tablet is reported by HealthCheck
|
||||
allTablets := fhc.GetAllTablets()
|
||||
key := TabletToMapKey(tablet)
|
||||
assert.Contains(t, allTablets, key)
|
||||
assert.True(t, proto.Equal(tablet, allTablets[key]))
|
||||
|
||||
// Add a second tablet to the topology that should get filtered out by the keyspace filter
|
||||
tablet2 := &topodatapb.Tablet{
|
||||
Alias: &topodatapb.TabletAlias{
|
||||
Cell: "aa",
|
||||
Uid: 2,
|
||||
},
|
||||
Hostname: "host2",
|
||||
PortMap: map[string]int32{
|
||||
"vt": 789,
|
||||
},
|
||||
Keyspace: "ks3",
|
||||
Shard: "shard",
|
||||
}
|
||||
require.NoError(t, ts.CreateTablet(context.Background(), tablet2))
|
||||
|
||||
tw.loadTablets()
|
||||
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1})
|
||||
checkChecksum(t, tw, 2762153755)
|
||||
|
||||
// Check the new tablet is NOT reported by HealthCheck.
|
||||
allTablets = fhc.GetAllTablets()
|
||||
assert.Len(t, allTablets, 1)
|
||||
key = TabletToMapKey(tablet2)
|
||||
assert.NotContains(t, allTablets, key)
|
||||
|
||||
// Load the tablets again to show that when refreshKnownTablets is disabled,
|
||||
// only the list is read from the topo and the checksum doesn't change
|
||||
tw.loadTablets()
|
||||
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1})
|
||||
checkChecksum(t, tw, 2762153755)
|
||||
|
||||
// With refreshKnownTablets set to false, changes to the port map for the same tablet alias
|
||||
// should not be reflected in the HealtCheck state
|
||||
_, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error {
|
||||
t.PortMap["vt"] = 456
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
tw.loadTablets()
|
||||
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1})
|
||||
checkChecksum(t, tw, 2762153755)
|
||||
|
||||
allTablets = fhc.GetAllTablets()
|
||||
assert.Len(t, allTablets, 1)
|
||||
origKey := TabletToMapKey(tablet)
|
||||
tabletWithNewPort := proto.Clone(tablet).(*topodatapb.Tablet)
|
||||
tabletWithNewPort.PortMap["vt"] = 456
|
||||
keyWithNewPort := TabletToMapKey(tabletWithNewPort)
|
||||
assert.Contains(t, allTablets, origKey)
|
||||
assert.NotContains(t, allTablets, keyWithNewPort)
|
||||
|
||||
// Remove the tracked tablet from the topo and check that it is detected as being gone.
|
||||
require.NoError(t, ts.DeleteTablet(context.Background(), tablet.Alias))
|
||||
|
||||
tw.loadTablets()
|
||||
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "RemoveTablet": 1})
|
||||
checkChecksum(t, tw, 789108290)
|
||||
assert.Empty(t, fhc.GetAllTablets())
|
||||
|
||||
// Remove ignored tablet and check that we didn't try to remove it from the health check
|
||||
require.NoError(t, ts.DeleteTablet(context.Background(), tablet2.Alias))
|
||||
|
||||
tw.loadTablets()
|
||||
checkOpCounts(t, counts, map[string]int64{"ListTablets": 1})
|
||||
checkChecksum(t, tw, 0)
|
||||
assert.Empty(t, fhc.GetAllTablets())
|
||||
|
||||
tw.Stop()
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче