diff --git a/go/flags/endtoend/vtctld.txt b/go/flags/endtoend/vtctld.txt index 887a7496b4..c2cf351eb5 100644 --- a/go/flags/endtoend/vtctld.txt +++ b/go/flags/endtoend/vtctld.txt @@ -121,9 +121,6 @@ Usage of vtctld: --service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice --sql-max-length-errors int truncate queries in error logs to the given length (default unlimited) --sql-max-length-ui int truncate queries in debug UIs to the given length (default 512) (default 512) - --srv_topo_cache_refresh duration how frequently to refresh the topology for cached entries (default 1s) - --srv_topo_cache_ttl duration how long to use cached entries for topology (default 1s) - --srv_topo_timeout duration topo server timeout (default 5s) --stats_backend string The name of the registered push-based monitoring/stats backend to use --stats_combine_dimensions string List of dimensions to be combined into a single "all" value in exported stats vars --stats_common_tags string Comma-separated list of common tags for the stats backend. It provides both label and values. Example: label1:value1,label2:value2 diff --git a/go/flags/endtoend/vtexplain.txt b/go/flags/endtoend/vtexplain.txt index 5cb3aed7e9..2e6dab79e6 100644 --- a/go/flags/endtoend/vtexplain.txt +++ b/go/flags/endtoend/vtexplain.txt @@ -111,9 +111,6 @@ Usage of vtexplain: --sql-file string Identifies the file that contains the SQL commands to analyze --sql-max-length-errors int truncate queries in error logs to the given length (default unlimited) --sql-max-length-ui int truncate queries in debug UIs to the given length (default 512) (default 512) - --srv_topo_cache_refresh duration how frequently to refresh the topology for cached entries (default 1s) - --srv_topo_cache_ttl duration how long to use cached entries for topology (default 1s) - --srv_topo_timeout duration topo server timeout (default 5s) --stats_backend string The name of the registered push-based monitoring/stats backend to use --stats_combine_dimensions string List of dimensions to be combined into a single "all" value in exported stats vars --stats_common_tags string Comma-separated list of common tags for the stats backend. It provides both label and values. Example: label1:value1,label2:value2 diff --git a/go/vt/srvtopo/discover_test.go b/go/vt/srvtopo/discover_test.go index 8f1c7e3c9f..c076ba0e7b 100644 --- a/go/vt/srvtopo/discover_test.go +++ b/go/vt/srvtopo/discover_test.go @@ -17,12 +17,11 @@ limitations under the License. package srvtopo import ( - "flag" + "context" "reflect" "sort" "testing" - - "context" + "time" "vitess.io/vitess/go/vt/topo/memorytopo" @@ -51,8 +50,14 @@ func (a TargetArray) Less(i, j int) bool { func TestFindAllTargets(t *testing.T) { ctx := context.Background() ts := memorytopo.NewServer("cell1", "cell2") - flag.Set("srv_topo_cache_refresh", "0s") // No caching values - flag.Set("srv_topo_cache_ttl", "0s") // No caching values + + srvTopoCacheRefresh = 0 + srvTopoCacheTTL = 0 + defer func() { + srvTopoCacheRefresh = 1 * time.Second + srvTopoCacheTTL = 1 * time.Second + + }() rs := NewResilientServer(ts, "TestFindAllKeyspaceShards") // No keyspace / shards. diff --git a/go/vt/srvtopo/query.go b/go/vt/srvtopo/query.go index 47bb9d6b1f..098f5c77bc 100644 --- a/go/vt/srvtopo/query.go +++ b/go/vt/srvtopo/query.go @@ -108,7 +108,7 @@ func (q *resilientQuery) getCurrentValue(ctx context.Context, wkey fmt.Stringer, } }() - newCtx, cancel := context.WithTimeout(ctx, *srvTopoTimeout) + newCtx, cancel := context.WithTimeout(ctx, srvTopoTimeout) defer cancel() result, err := q.query(newCtx, entry) diff --git a/go/vt/srvtopo/resilient_server.go b/go/vt/srvtopo/resilient_server.go index b4d3d5acc6..cac368dedd 100644 --- a/go/vt/srvtopo/resilient_server.go +++ b/go/vt/srvtopo/resilient_server.go @@ -17,11 +17,13 @@ limitations under the License. package srvtopo import ( - "flag" "time" + "github.com/spf13/pflag" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" ) @@ -38,11 +40,23 @@ var ( // setting the watch fails, we will use the last known value until // srv_topo_cache_ttl elapses and we only try to re-establish the watch // once every srv_topo_cache_refresh interval. - srvTopoTimeout = flag.Duration("srv_topo_timeout", 5*time.Second, "topo server timeout") - srvTopoCacheTTL = flag.Duration("srv_topo_cache_ttl", 1*time.Second, "how long to use cached entries for topology") - srvTopoCacheRefresh = flag.Duration("srv_topo_cache_refresh", 1*time.Second, "how frequently to refresh the topology for cached entries") + srvTopoTimeout = 5 * time.Second + srvTopoCacheTTL = 1 * time.Second + srvTopoCacheRefresh = 1 * time.Second ) +func registerFlags(fs *pflag.FlagSet) { + fs.DurationVar(&srvTopoTimeout, "srv_topo_timeout", srvTopoTimeout, "topo server timeout") + fs.DurationVar(&srvTopoCacheTTL, "srv_topo_cache_ttl", srvTopoCacheTTL, "how long to use cached entries for topology") + fs.DurationVar(&srvTopoCacheRefresh, "srv_topo_cache_refresh", srvTopoCacheRefresh, "how frequently to refresh the topology for cached entries") +} + +func init() { + servenv.OnParseFor("vtgate", registerFlags) + servenv.OnParseFor("vttablet", registerFlags) + servenv.OnParseFor("vtcombo", registerFlags) +} + const ( queryCategory = "query" cachedCategory = "cached" @@ -65,7 +79,7 @@ type ResilientServer struct { // NewResilientServer creates a new ResilientServer // based on the provided topo.Server. func NewResilientServer(base *topo.Server, counterPrefix string) *ResilientServer { - if *srvTopoCacheRefresh > *srvTopoCacheTTL { + if srvTopoCacheRefresh > srvTopoCacheTTL { log.Fatalf("srv_topo_cache_refresh must be less than or equal to srv_topo_cache_ttl") } @@ -80,9 +94,9 @@ func NewResilientServer(base *topo.Server, counterPrefix string) *ResilientServe return &ResilientServer{ topoServer: base, counts: counts, - SrvKeyspaceWatcher: NewSrvKeyspaceWatcher(base, counts, *srvTopoCacheRefresh, *srvTopoCacheTTL), - SrvVSchemaWatcher: NewSrvVSchemaWatcher(base, counts, *srvTopoCacheRefresh, *srvTopoCacheTTL), - SrvKeyspaceNamesQuery: NewSrvKeyspaceNamesQuery(base, counts, *srvTopoCacheRefresh, *srvTopoCacheTTL), + SrvKeyspaceWatcher: NewSrvKeyspaceWatcher(base, counts, srvTopoCacheRefresh, srvTopoCacheTTL), + SrvVSchemaWatcher: NewSrvVSchemaWatcher(base, counts, srvTopoCacheRefresh, srvTopoCacheTTL), + SrvKeyspaceNamesQuery: NewSrvKeyspaceNamesQuery(base, counts, srvTopoCacheRefresh, srvTopoCacheTTL), } } diff --git a/go/vt/srvtopo/resilient_server_test.go b/go/vt/srvtopo/resilient_server_test.go index 6c3572b086..a57af927e5 100644 --- a/go/vt/srvtopo/resilient_server_test.go +++ b/go/vt/srvtopo/resilient_server_test.go @@ -45,11 +45,11 @@ import ( // TestGetSrvKeyspace will test we properly return updated SrvKeyspace. func TestGetSrvKeyspace(t *testing.T) { ts, factory := memorytopo.NewServerAndFactory("test_cell") - *srvTopoCacheTTL = time.Duration(200 * time.Millisecond) - *srvTopoCacheRefresh = time.Duration(80 * time.Millisecond) + srvTopoCacheTTL = 200 * time.Millisecond + srvTopoCacheRefresh = 80 * time.Millisecond defer func() { - *srvTopoCacheTTL = 1 * time.Second - *srvTopoCacheRefresh = 1 * time.Second + srvTopoCacheTTL = 1 * time.Second + srvTopoCacheRefresh = 1 * time.Second }() rs := NewResilientServer(ts, "TestGetSrvKeyspace") @@ -61,7 +61,7 @@ func TestGetSrvKeyspace(t *testing.T) { } // Wait until the cached error expires. - time.Sleep(*srvTopoCacheRefresh + 10*time.Millisecond) + time.Sleep(srvTopoCacheRefresh + 10*time.Millisecond) // Set SrvKeyspace with value want := &topodatapb.SrvKeyspace{} @@ -70,7 +70,7 @@ func TestGetSrvKeyspace(t *testing.T) { // wait until we get the right value var got *topodatapb.SrvKeyspace - expiry := time.Now().Add(*srvTopoCacheRefresh - 20*time.Millisecond) + expiry := time.Now().Add(srvTopoCacheRefresh - 20*time.Millisecond) for { ctx, cancel := context.WithCancel(context.Background()) got, err = rs.GetSrvKeyspace(ctx, "test_cell", "test_ks") @@ -181,7 +181,7 @@ func TestGetSrvKeyspace(t *testing.T) { forceErr := topo.NewError(topo.Timeout, "test topo error") factory.SetError(forceErr) - expiry = time.Now().Add(*srvTopoCacheTTL / 2) + expiry = time.Now().Add(srvTopoCacheTTL / 2) for { got, err = rs.GetSrvKeyspace(context.Background(), "test_cell", "test_ks") if err != nil || !proto.Equal(want, got) { @@ -227,7 +227,7 @@ func TestGetSrvKeyspace(t *testing.T) { } // Now sleep for the rest of the interval and we should get the value again - time.Sleep(*srvTopoCacheRefresh) + time.Sleep(srvTopoCacheRefresh) got, err = rs.GetSrvKeyspace(context.Background(), "test_cell", "test_ks") if err != nil || !proto.Equal(want, got) { t.Errorf("expected value to be restored, got %v", err) @@ -236,11 +236,11 @@ func TestGetSrvKeyspace(t *testing.T) { // Now sleep for the full TTL before setting the error again to test // that even when there is no activity on the key, it is still cached // for the full configured TTL. - time.Sleep(*srvTopoCacheTTL) + time.Sleep(srvTopoCacheTTL) forceErr = topo.NewError(topo.Interrupted, "another test topo error") factory.SetError(forceErr) - expiry = time.Now().Add(*srvTopoCacheTTL / 2) + expiry = time.Now().Add(srvTopoCacheTTL / 2) for { _, err = rs.GetSrvKeyspace(context.Background(), "test_cell", "test_ks") if err != nil { @@ -274,7 +274,7 @@ func TestGetSrvKeyspace(t *testing.T) { // Check that the expected number of errors were counted during the // interval errorReqs := rs.counts.Counts()[errorCategory] - expectedErrors := int64(time.Since(errorTestStart) / *srvTopoCacheRefresh) + expectedErrors := int64(time.Since(errorTestStart) / srvTopoCacheRefresh) if errorReqs-errorReqsBefore > expectedErrors { t.Errorf("expected <= %v error requests got %d", expectedErrors, errorReqs-errorReqsBefore) } @@ -301,11 +301,11 @@ func TestGetSrvKeyspace(t *testing.T) { factory.SetError(forceErr) factory.Lock() go func() { - time.Sleep(*srvTopoCacheRefresh * 2) + time.Sleep(srvTopoCacheRefresh * 2) factory.Unlock() }() - expiry = time.Now().Add(*srvTopoCacheTTL / 2) + expiry = time.Now().Add(srvTopoCacheTTL / 2) for { got, err = rs.GetSrvKeyspace(context.Background(), "test_cell", "test_ks") if err != nil || !proto.Equal(want, got) { @@ -330,7 +330,7 @@ func TestGetSrvKeyspace(t *testing.T) { // Clear the error, wait for things to proceed again factory.SetError(nil) - time.Sleep(*srvTopoCacheTTL) + time.Sleep(srvTopoCacheTTL) got, err = rs.GetSrvKeyspace(context.Background(), "test_cell", "test_ks") if err != nil || !proto.Equal(want, got) { @@ -364,11 +364,11 @@ func TestGetSrvKeyspace(t *testing.T) { // the topo server upon failure. func TestSrvKeyspaceCachedError(t *testing.T) { ts := memorytopo.NewServer("test_cell") - *srvTopoCacheTTL = 100 * time.Millisecond - *srvTopoCacheRefresh = 40 * time.Millisecond + srvTopoCacheTTL = 100 * time.Millisecond + srvTopoCacheRefresh = 40 * time.Millisecond defer func() { - *srvTopoCacheTTL = 1 * time.Second - *srvTopoCacheRefresh = 1 * time.Second + srvTopoCacheTTL = 1 * time.Second + srvTopoCacheRefresh = 1 * time.Second }() rs := NewResilientServer(ts, "TestSrvKeyspaceCachedErrors") @@ -383,7 +383,7 @@ func TestSrvKeyspaceCachedError(t *testing.T) { t.Errorf("Error wasn't saved properly") } - time.Sleep(*srvTopoCacheTTL + 10*time.Millisecond) + time.Sleep(srvTopoCacheTTL + 10*time.Millisecond) // Ask again with a different context, should get an error and // save that context. ctx, cancel := context.WithCancel(ctx) @@ -431,7 +431,7 @@ func TestGetSrvKeyspaceCreated(t *testing.T) { } func TestWatchSrvVSchema(t *testing.T) { - *srvTopoCacheRefresh = 10 * time.Millisecond + srvTopoCacheRefresh = 10 * time.Millisecond ctx := context.Background() ts := memorytopo.NewServer("test_cell") rs := NewResilientServer(ts, "TestWatchSrvVSchema") @@ -517,11 +517,11 @@ func TestWatchSrvVSchema(t *testing.T) { func TestGetSrvKeyspaceNames(t *testing.T) { ts, factory := memorytopo.NewServerAndFactory("test_cell") - *srvTopoCacheTTL = 100 * time.Millisecond - *srvTopoCacheRefresh = 40 * time.Millisecond + srvTopoCacheTTL = 100 * time.Millisecond + srvTopoCacheRefresh = 40 * time.Millisecond defer func() { - *srvTopoCacheTTL = 1 * time.Second - *srvTopoCacheRefresh = 1 * time.Second + srvTopoCacheTTL = 1 * time.Second + srvTopoCacheRefresh = 1 * time.Second }() rs := NewResilientServer(ts, "TestGetSrvKeyspaceNames") @@ -551,7 +551,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) { // requests aren't blocked factory.Lock() go func() { - time.Sleep(*srvTopoCacheTTL / 2) + time.Sleep(srvTopoCacheTTL / 2) factory.Unlock() }() @@ -568,7 +568,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) { t.Errorf("GetSrvKeyspaceNames got %v want %v", names, wantNames) } - if time.Since(start) >= *srvTopoCacheRefresh+10*time.Millisecond { + if time.Since(start) >= srvTopoCacheRefresh+10*time.Millisecond { break } @@ -601,7 +601,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) { } // Now, wait long enough that with a stale ask, we'll get an error - time.Sleep(*srvTopoCacheRefresh*2 + 2*time.Millisecond) + time.Sleep(srvTopoCacheRefresh*2 + 2*time.Millisecond) _, err = rs.GetSrvKeyspaceNames(ctx, "test_cell", true) if err != forceErr { t.Fatalf("expected an error if asking for really stale cache data") @@ -646,9 +646,9 @@ func TestGetSrvKeyspaceNames(t *testing.T) { factory.SetError(forceErr) factory.Lock() - time.Sleep(*srvTopoCacheTTL) + time.Sleep(srvTopoCacheTTL) - timeoutCtx, cancel := context.WithTimeout(context.Background(), *srvTopoCacheRefresh*2) //nolint + timeoutCtx, cancel := context.WithTimeout(context.Background(), srvTopoCacheRefresh*2) //nolint defer cancel() _, err = rs.GetSrvKeyspaceNames(timeoutCtx, "test_cell", false) if err != context.DeadlineExceeded { @@ -671,11 +671,11 @@ func (w *watched) equals(other *watched) bool { func TestSrvKeyspaceWatcher(t *testing.T) { ts, factory := memorytopo.NewServerAndFactory("test_cell") - *srvTopoCacheTTL = time.Duration(100 * time.Millisecond) - *srvTopoCacheRefresh = time.Duration(40 * time.Millisecond) + srvTopoCacheTTL = 100 * time.Millisecond + srvTopoCacheRefresh = 40 * time.Millisecond defer func() { - *srvTopoCacheTTL = 1 * time.Second - *srvTopoCacheRefresh = 1 * time.Second + srvTopoCacheTTL = 1 * time.Second + srvTopoCacheRefresh = 1 * time.Second }() rs := NewResilientServer(ts, "TestGetSrvKeyspaceWatcher") @@ -795,11 +795,11 @@ func TestSrvKeyspaceWatcher(t *testing.T) { func TestSrvKeyspaceListener(t *testing.T) { ts, _ := memorytopo.NewServerAndFactory("test_cell") - *srvTopoCacheTTL = time.Duration(100 * time.Millisecond) - *srvTopoCacheRefresh = time.Duration(40 * time.Millisecond) + srvTopoCacheTTL = 100 * time.Millisecond + srvTopoCacheRefresh = 40 * time.Millisecond defer func() { - *srvTopoCacheTTL = 1 * time.Second - *srvTopoCacheRefresh = 1 * time.Second + srvTopoCacheTTL = 1 * time.Second + srvTopoCacheRefresh = 1 * time.Second }() rs := NewResilientServer(ts, "TestGetSrvKeyspaceWatcher")