[cli] [topo] Migrate topo flags to pflags (#11393)

* [cli] [topo] Migrate topo flags to pflags

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* fix vtorc test

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* feat: fix VTOrc test

Signed-off-by: Manan Gupta <manan@planetscale.com>

* feat: remove binaries from the list that don't need the flags

Signed-off-by: Manan Gupta <manan@planetscale.com>

* test: update flag help outputs

Signed-off-by: Manan Gupta <manan@planetscale.com>

* code review

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

* fix import formatting

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>

Signed-off-by: Rameez Sajwani <rameezwazirali@hotmail.com>
Signed-off-by: Manan Gupta <manan@planetscale.com>
Co-authored-by: Manan Gupta <manan@planetscale.com>
This commit is contained in:
Rameez Sajwani 2022-09-30 19:19:38 -07:00 коммит произвёл GitHub
Родитель 2e05ab06c6
Коммит 0f310322f3
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
38 изменённых файлов: 154 добавлений и 151 удалений

Просмотреть файл

@ -277,7 +277,7 @@ func commandTabletExternallyReparented(cmd *cobra.Command, args []string) error
}
func init() {
EmergencyReparentShard.Flags().DurationVar(&emergencyReparentShardOptions.WaitReplicasTimeout, "wait-replicas-timeout", *topo.RemoteOperationTimeout, "Time to wait for replicas to catch up in reparenting.")
EmergencyReparentShard.Flags().DurationVar(&emergencyReparentShardOptions.WaitReplicasTimeout, "wait-replicas-timeout", topo.RemoteOperationTimeout, "Time to wait for replicas to catch up in reparenting.")
EmergencyReparentShard.Flags().StringVar(&emergencyReparentShardOptions.NewPrimaryAliasStr, "new-primary", "", "Alias of a tablet that should be the new primary. If not specified, the vtctld will select the best candidate to promote.")
EmergencyReparentShard.Flags().BoolVar(&emergencyReparentShardOptions.PreventCrossCellPromotion, "prevent-cross-cell-promotion", false, "Only promotes a new primary from the same cell as the previous primary.")
EmergencyReparentShard.Flags().StringSliceVarP(&emergencyReparentShardOptions.IgnoreReplicaAliasStrList, "ignore-replicas", "i", nil, "Comma-separated, repeated list of replica tablet aliases to ignore during the emergency reparent.")
@ -287,7 +287,7 @@ func init() {
InitShardPrimary.Flags().BoolVar(&initShardPrimaryOptions.Force, "force", false, "Force the reparent even if the provided tablet is not writable or the shard primary.")
Root.AddCommand(InitShardPrimary)
PlannedReparentShard.Flags().DurationVar(&plannedReparentShardOptions.WaitReplicasTimeout, "wait-replicas-timeout", *topo.RemoteOperationTimeout, "Time to wait for replicas to catch up on replication both before and after reparenting.")
PlannedReparentShard.Flags().DurationVar(&plannedReparentShardOptions.WaitReplicasTimeout, "wait-replicas-timeout", topo.RemoteOperationTimeout, "Time to wait for replicas to catch up on replication both before and after reparenting.")
PlannedReparentShard.Flags().StringVar(&plannedReparentShardOptions.NewPrimaryAliasStr, "new-primary", "", "Alias of a tablet that should be the new primary.")
PlannedReparentShard.Flags().StringVar(&plannedReparentShardOptions.AvoidPrimaryAliasStr, "avoid-primary", "", "Alias of a tablet that should not be the primary; i.e. \"reparent to any other tablet if this one is the primary\".")
Root.AddCommand(PlannedReparentShard)

Просмотреть файл

@ -103,7 +103,6 @@ Global flags:
--port int port for the server
--pprof strings enable profiling
--purge_logs_interval duration how often try to remove old logs (default 1h0m0s)
--remote_operation_timeout duration time to wait for a remote operation (default 30s)
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
@ -111,9 +110,6 @@ Global flags:
--stderrthreshold severity logs at or above this threshold go to stderr (default 1)
--tablet_dir string The directory within the vtdataroot to store vttablet/mysql files. Defaults to being generated by the tablet uid.
--tablet_uid uint Tablet UID (default 41983)
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
-v, --v Level log level for V logs
--version print binary version
--vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging

Просмотреть файл

@ -91,7 +91,6 @@ Usage of mysqlctld:
--port int port for the server
--pprof strings enable profiling
--purge_logs_interval duration how often try to remove old logs (default 1h0m0s)
--remote_operation_timeout duration time to wait for a remote operation (default 30s)
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
@ -99,9 +98,6 @@ Usage of mysqlctld:
--stderrthreshold severity logs at or above this threshold go to stderr (default 1)
--tablet_dir string The directory within the vtdataroot to store vttablet/mysql files. Defaults to being generated by the tablet uid.
--tablet_uid uint Tablet UID (default 41983)
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
-v, --v Level log level for V logs
--version print binary version
--vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging

Просмотреть файл

@ -108,7 +108,6 @@ Flags:
--logtostderr log to standard error instead of files
--mysql_server_version string MySQL server version to advertise.
--purge_logs_interval duration how often try to remove old logs (default 1h0m0s)
--remote_operation_timeout duration time to wait for a remote operation (default 30s)
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
--server string server to use for connection (required)
--stats_backend string The name of the registered push-based monitoring/stats backend to use
@ -117,9 +116,6 @@ Flags:
--stats_drop_variables string Variables to be dropped from the list of exported variables.
--stats_emit_period duration Interval between emitting stats to all registered backends (default 1m0s)
--stderrthreshold severity logs at or above this threshold go to stderr (default 1)
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
-v, --v Level log level for V logs
--version version for vtctldclient
--vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging

Просмотреть файл

@ -88,7 +88,6 @@ Usage of vttestserver:
--purge_logs_interval duration how often try to remove old logs (default 1h0m0s)
--queryserver-config-transaction-timeout float query server transaction timeout (in seconds), a transaction will be killed if it takes longer than this value
--rdonly_count int Rdonly tablets per shard (default 1)
--remote_operation_timeout duration time to wait for a remote operation (default 30s)
--replica_count int Replica tablets per shard (includes primary) (default 2)
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)
--rng_seed int The random number generator seed to use when initializing with random data (see also --initialize_with_random_data). Multiple runs with the same seed will result with the same initial data. (default 123)
@ -113,13 +112,6 @@ Usage of vttestserver:
--topo_consul_lock_session_checks string List of checks for consul session. (default "serfHealth")
--topo_consul_lock_session_ttl string TTL for consul session.
--topo_consul_watch_poll_duration duration time of the long poll for watch queries. (default 30s)
--topo_etcd_lease_ttl int Lease TTL for locks and leader election. The client will use KeepAlive to keep the lease going. (default 30)
--topo_etcd_tls_ca string path to the ca to use to validate the server cert when connecting to the etcd topo server
--topo_etcd_tls_cert string path to the client cert to use to connect to the etcd topo server, requires topo_etcd_tls_key, enables TLS
--topo_etcd_tls_key string path to the client key to use to connect to the etcd topo server, enables TLS
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)

Просмотреть файл

@ -583,7 +583,6 @@ func (cluster *LocalProcessCluster) StartKeyspaceLegacy(keyspace Keyspace, shard
// This does not start any process and user have to explicitly start all
// the required services (ex topo, vtgate, mysql and vttablet)
func (cluster *LocalProcessCluster) SetupCluster(keyspace *Keyspace, shards []Shard) (err error) {
log.Infof("Starting keyspace: %v", keyspace.Name)
if !cluster.ReusingVTDATAROOT {
@ -598,7 +597,6 @@ func (cluster *LocalProcessCluster) SetupCluster(keyspace *Keyspace, shards []Sh
// Create shard
for _, shard := range shards {
for _, tablet := range shard.Vttablets {
// Setup MysqlctlProcess
tablet.MysqlctlProcess = *MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, cluster.TmpDirectory)
// Setup VttabletProcess

Просмотреть файл

@ -17,20 +17,20 @@ limitations under the License.
package readtopologyinstance
import (
"flag"
"fmt"
"os"
"testing"
"time"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/vtorc/utils"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vtorc/config"
"vitess.io/vitess/go/vt/vtorc/inst"
"vitess.io/vitess/go/vt/vtorc/server"
_ "github.com/go-sql-driver/mysql"
_ "github.com/mattn/go-sqlite3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -42,13 +42,19 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
}()
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
oldArgs := os.Args
defer func() {
// Restore the old args after the test
os.Args = oldArgs
}()
err := flag.Set("topo_global_server_address", clusterInfo.ClusterInstance.VtctlProcess.TopoGlobalAddress)
require.NoError(t, err)
err = flag.Set("topo_implementation", clusterInfo.ClusterInstance.VtctlProcess.TopoImplementation)
require.NoError(t, err)
err = flag.Set("topo_global_root", clusterInfo.ClusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)
// Change the args such that they match how we would invoke VTOrc
os.Args = []string{"vtorc",
"--topo_global_server_address", clusterInfo.ClusterInstance.VtctlProcess.TopoGlobalAddress,
"--topo_implementation", clusterInfo.ClusterInstance.VtctlProcess.TopoImplementation,
"--topo_global_root", clusterInfo.ClusterInstance.VtctlProcess.TopoGlobalRoot,
}
servenv.ParseFlags("vtorc")
config.Config.RecoveryPeriodBlockSeconds = 1
config.Config.InstancePollSeconds = 1
config.MarkConfigurationLoaded()

Просмотреть файл

@ -172,7 +172,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
// Since there is only one primary, we ignore cell and find the primary
aliases := make([]*topodatapb.TabletAlias, 0)
if len(tp.tabletTypes) == 1 && tp.tabletTypes[0] == topodatapb.TabletType_PRIMARY {
shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
si, err := tp.ts.GetShard(shortCtx, tp.keyspace, tp.shard)
if err != nil {
@ -185,12 +185,12 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
for _, cell := range tp.cells {
// check if cell is actually an alias
// non-blocking read so that this is fast
shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
_, err := tp.ts.GetCellInfo(shortCtx, cell, false)
if err != nil {
// not a valid cell, check whether it is a cell alias
shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
alias, err := tp.ts.GetCellsAlias(shortCtx, cell, false)
// if we get an error, either cellAlias doesn't exist or it isn't a cell alias at all. Ignore and continue
@ -205,7 +205,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
}
}
for _, cell := range actualCells {
shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
// match cell, keyspace and shard
sri, err := tp.ts.GetShardReplication(shortCtx, cell, tp.keyspace, tp.shard)
@ -222,7 +222,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
if len(aliases) == 0 {
return nil
}
shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases)
if err != nil {

Просмотреть файл

@ -269,7 +269,7 @@ func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, params BackupP
// the saved replicationPosition before proceeding
tmc := tmclient.NewTabletManagerClient()
defer tmc.Close()
remoteCtx, remoteCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
remoteCtx, remoteCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer remoteCancel()
pos, err := getPrimaryPosition(remoteCtx, tmc, params.TopoServer, params.Keyspace, params.Shard)

Просмотреть файл

@ -360,7 +360,7 @@ func (exec *TabletExecutor) Execute(ctx context.Context, sqls []string) *Execute
}
providedUUID := ""
rl := timer.NewRateLimiter(*topo.RemoteOperationTimeout / 4)
rl := timer.NewRateLimiter(topo.RemoteOperationTimeout / 4)
defer rl.Stop()
syncOperationExecuted := false

Просмотреть файл

@ -163,7 +163,7 @@ func (ts *Server) DeleteCellInfo(ctx context.Context, cell string, force bool) e
// local-down-topo scenario would mean we never can delete it.
// (see https://github.com/vitessio/vitess/issues/8220).
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), *RemoteOperationTimeout)
ctx, cancel = context.WithTimeout(context.Background(), RemoteOperationTimeout)
defer cancel()
default:
// Context still has some time left, no need to make a new one.
@ -207,13 +207,13 @@ func (ts *Server) ExpandCells(ctx context.Context, cells string) ([]string, erro
}
expandCell := func(ctx context.Context, cell string) error {
shortCtx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout)
shortCtx, cancel := context.WithTimeout(ctx, RemoteOperationTimeout)
defer cancel()
_, err := ts.GetCellInfo(shortCtx, cell, false /* strongRead */)
if err != nil {
// Not a valid cell name. Check whether it is an alias.
shortCtx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout)
shortCtx, cancel := context.WithTimeout(ctx, RemoteOperationTimeout)
defer cancel()
alias, err2 := ts.GetCellsAlias(shortCtx, cell, false /* strongRead */)

Просмотреть файл

@ -48,7 +48,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
nodePath := path.Join(s.root, filePath)
options := &api.QueryOptions{}
initialCtx, initialCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
initialCtx, initialCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer initialCancel()
pair, _, err := s.kv.Get(nodePath, options.WithContext(initialCtx))

Просмотреть файл

@ -23,13 +23,12 @@ import (
"github.com/spf13/pflag"
"vitess.io/vitess/go/vt/servenv"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
)
@ -39,8 +38,7 @@ var (
)
func init() {
for _, cmd := range []string{"vttablet", "vtctl", "vtctld", "mysqlctl", "mysqlctld", "vttestserver", "vtcombo", "vtctldclient", "vtexplain", "vtgate",
"vtgr", "vtorc", "vtbackup"} {
for _, cmd := range topo.FlagBinaries {
servenv.OnParseFor(cmd, registerEtcd2TopoLockFlags)
}
}

Просмотреть файл

@ -40,14 +40,12 @@ import (
"time"
"github.com/spf13/pflag"
"vitess.io/vitess/go/vt/servenv"
"go.etcd.io/etcd/client/pkg/v3/tlsutil"
"google.golang.org/grpc"
clientv3 "go.etcd.io/etcd/client/v3"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
)
@ -82,8 +80,7 @@ type Server struct {
}
func init() {
for _, cmd := range []string{"vttablet", "vtctl", "vtctld", "mysqlctl", "mysqlctld", "vttestserver", "vtcombo", "vtctldclient", "vtexplain", "vtgate",
"vtgr", "vtorc", "vtbackup"} {
for _, cmd := range topo.FlagBinaries {
servenv.OnParseFor(cmd, registerEtcd2TopoFlags)
}
topo.RegisterFactory("etcd2", Factory{})

Просмотреть файл

@ -37,7 +37,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
nodePath := path.Join(s.root, filePath)
// Get the initial version of the file
initialCtx, initialCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
initialCtx, initialCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer initialCancel()
initial, err := s.cli.Get(initialCtx, nodePath)
if err != nil {

Просмотреть файл

@ -34,7 +34,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
current := &topo.WatchData{}
// get current
initialCtx, initialCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
initialCtx, initialCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer initialCancel()
contents, ver, err := s.Get(initialCtx, filePath)

Просмотреть файл

@ -17,21 +17,21 @@ limitations under the License.
package topo
import (
"context"
"encoding/json"
"flag"
"os"
"os/user"
"path"
"sync"
"time"
"context"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"github.com/spf13/pflag"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
)
// This file contains utility methods and definitions to lock
@ -46,7 +46,7 @@ var (
// RemoteOperationTimeout is used for operations where we have to
// call out to another process.
// Used for RPC calls (including topo server calls)
RemoteOperationTimeout = flag.Duration("remote_operation_timeout", 30*time.Second, "time to wait for a remote operation")
RemoteOperationTimeout = 30 * time.Second
)
// Lock describes a long-running lock on a keyspace or a shard.
@ -62,6 +62,16 @@ type Lock struct {
Status string
}
func init() {
for _, cmd := range FlagBinaries {
servenv.OnParseFor(cmd, registerTopoLockFlags)
}
}
func registerTopoLockFlags(fs *pflag.FlagSet) {
fs.DurationVar(&RemoteOperationTimeout, "remote_operation_timeout", RemoteOperationTimeout, "time to wait for a remote operation")
}
// newLock creates a new Lock.
func newLock(action string) *Lock {
l := &Lock{
@ -234,7 +244,7 @@ func CheckKeyspaceLockedAndRenew(ctx context.Context, keyspace string) error {
func (l *Lock) lockKeyspace(ctx context.Context, ts *Server, keyspace string) (LockDescriptor, error) {
log.Infof("Locking keyspace %v for action %v", keyspace, l.Action)
ctx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, RemoteOperationTimeout)
defer cancel()
span, ctx := trace.NewSpan(ctx, "TopoServer.LockKeyspaceForAction")
@ -375,7 +385,7 @@ func CheckShardLocked(ctx context.Context, keyspace, shard string) error {
func (l *Lock) lockShard(ctx context.Context, ts *Server, keyspace, shard string) (LockDescriptor, error) {
log.Infof("Locking shard %v/%v for action %v", keyspace, shard, l.Action)
ctx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, RemoteOperationTimeout)
defer cancel()
span, ctx := trace.NewSpan(ctx, "TopoServer.LockShardForAction")

Просмотреть файл

@ -44,15 +44,15 @@ package topo
import (
"context"
"flag"
"fmt"
"sync"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vterrors"
"github.com/spf13/pflag"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
)
const (
@ -83,14 +83,12 @@ const (
// Path for all object types.
const (
CellsPath = "cells"
CellsAliasesPath = "cells_aliases"
KeyspacesPath = "keyspaces"
ShardsPath = "shards"
TabletsPath = "tablets"
MetadataPath = "metadata"
ExternalClusterMySQL = "mysql"
CellsPath = "cells"
CellsAliasesPath = "cells_aliases"
KeyspacesPath = "keyspaces"
ShardsPath = "shards"
TabletsPath = "tablets"
MetadataPath = "metadata"
ExternalClusterVitess = "vitess"
)
@ -158,15 +156,15 @@ type cellsToAliasesMap struct {
var (
// topoImplementation is the flag for which implementation to use.
topoImplementation = flag.String("topo_implementation", "", "the topology implementation to use")
topoImplementation string
// topoGlobalServerAddress is the address of the global topology
// server.
topoGlobalServerAddress = flag.String("topo_global_server_address", "", "the address of the global topology server")
topoGlobalServerAddress string
// topoGlobalRoot is the root path to use for the global topology
// server.
topoGlobalRoot = flag.String("topo_global_root", "", "the path of the global topology data in the global topology server")
topoGlobalRoot string
// factories has the factories for the Conn objects.
factories = make(map[string]Factory)
@ -174,8 +172,23 @@ var (
cellsAliases = cellsToAliasesMap{
cellsToAliases: make(map[string]string),
}
FlagBinaries = []string{"vttablet", "vtctl", "vtctld", "vtcombo", "vtexplain", "vtgate",
"vtgr", "vtorc", "vtbackup"}
)
func init() {
for _, cmd := range FlagBinaries {
servenv.OnParseFor(cmd, registerTopoFlags)
}
}
func registerTopoFlags(fs *pflag.FlagSet) {
fs.StringVar(&topoImplementation, "topo_implementation", topoImplementation, "the topology implementation to use")
fs.StringVar(&topoGlobalServerAddress, "topo_global_server_address", topoGlobalServerAddress, "the address of the global topology server")
fs.StringVar(&topoGlobalRoot, "topo_global_root", topoGlobalRoot, "the path of the global topology data in the global topology server")
}
// RegisterFactory registers a Factory for an implementation for a Server.
// If an implementation with that name already exists, it log.Fatals out.
// Call this in the 'init' function in your topology implementation module.
@ -227,15 +240,15 @@ func OpenServer(implementation, serverAddress, root string) (*Server, error) {
// Open returns a Server using the command line parameter flags
// for implementation, address and root. It log.Exits out if an error occurs.
func Open() *Server {
if *topoGlobalServerAddress == "" && *topoImplementation != "k8s" {
if topoGlobalServerAddress == "" && topoImplementation != "k8s" {
log.Exitf("topo_global_server_address must be configured")
}
if *topoGlobalRoot == "" {
if topoGlobalRoot == "" {
log.Exit("topo_global_root must be non-empty")
}
ts, err := OpenServer(*topoImplementation, *topoGlobalServerAddress, *topoGlobalRoot)
ts, err := OpenServer(topoImplementation, topoGlobalServerAddress, topoGlobalRoot)
if err != nil {
log.Exitf("Failed to open topo server (%v,%v,%v): %v", *topoImplementation, *topoGlobalServerAddress, *topoGlobalRoot, err)
log.Exitf("Failed to open topo server (%v,%v,%v): %v", topoImplementation, topoGlobalServerAddress, topoGlobalRoot, err)
}
return ts
}

Просмотреть файл

@ -31,7 +31,7 @@ func (zs *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData,
zkPath := path.Join(zs.root, filePath)
// Get the initial value, set the initial watch
initialCtx, initialCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
initialCtx, initialCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer initialCancel()
data, stats, watch, err := zs.conn.GetW(initialCtx, zkPath)

Просмотреть файл

@ -119,7 +119,7 @@ func (s *VtctldServer) AddCellInfo(ctx context.Context, req *vtctldatapb.AddCell
span.Annotate("cell_root", req.CellInfo.Root)
span.Annotate("cell_address", req.CellInfo.ServerAddress)
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
if err = s.ts.CreateCellInfo(ctx, req.Name, req.CellInfo); err != nil {
@ -139,7 +139,7 @@ func (s *VtctldServer) AddCellsAlias(ctx context.Context, req *vtctldatapb.AddCe
span.Annotate("cells_alias", req.Name)
span.Annotate("cells", strings.Join(req.Cells, ","))
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
if err = s.ts.CreateCellsAlias(ctx, req.Name, &topodatapb.CellsAlias{Cells: req.Cells}); err != nil {
@ -509,7 +509,7 @@ func (s *VtctldServer) ChangeTabletType(ctx context.Context, req *vtctldatapb.Ch
span.Annotate("dry_run", req.DryRun)
span.Annotate("tablet_type", topoproto.TabletTypeLString(req.DbType))
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
tablet, err := s.ts.GetTablet(ctx, req.TabletAlias)
@ -772,7 +772,7 @@ func (s *VtctldServer) DeleteCellInfo(ctx context.Context, req *vtctldatapb.Dele
span.Annotate("cell", req.Name)
span.Annotate("force", req.Force)
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
if err = s.ts.DeleteCellInfo(ctx, req.Name, req.Force); err != nil {
@ -791,7 +791,7 @@ func (s *VtctldServer) DeleteCellsAlias(ctx context.Context, req *vtctldatapb.De
span.Annotate("cells_alias", req.Name)
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
if err = s.ts.DeleteCellsAlias(ctx, req.Name); err != nil {
@ -924,7 +924,7 @@ func (s *VtctldServer) DeleteSrvVSchema(ctx context.Context, req *vtctldatapb.De
span.Annotate("cell", req.Cell)
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
if err = s.ts.DeleteSrvVSchema(ctx, req.Cell); err != nil {
@ -1479,7 +1479,7 @@ func (s *VtctldServer) GetSrvKeyspaceNames(ctx context.Context, req *vtctldatapb
cells := req.Cells
if len(cells) == 0 {
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
cells, err = s.ts.GetCellInfoNames(ctx)
@ -1493,7 +1493,7 @@ func (s *VtctldServer) GetSrvKeyspaceNames(ctx context.Context, req *vtctldatapb
// Contact each cell sequentially, each cell is bounded by *topo.RemoteOperationTimeout.
// Total runtime is O(len(cells) * topo.RemoteOperationTimeout).
for _, cell := range cells {
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
names, err2 := s.ts.GetSrvKeyspaceNames(ctx, cell)
if err2 != nil {
cancel()
@ -1663,7 +1663,7 @@ func (s *VtctldServer) GetTablets(ctx context.Context, req *vtctldatapb.GetTable
//
// Per-cell goroutines may also cancel this context if they fail and the
// request specified Strict=true to allow us to fail faster.
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
var tabletMap map[string]*topo.TabletInfo
@ -2281,7 +2281,7 @@ func (s *VtctldServer) RefreshState(ctx context.Context, req *vtctldatapb.Refres
return nil, err
}
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
tablet, err := s.ts.GetTablet(ctx, req.TabletAlias)
@ -2314,7 +2314,7 @@ func (s *VtctldServer) RefreshStateByShard(ctx context.Context, req *vtctldatapb
return nil, err
}
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
si, err := s.ts.GetShard(ctx, req.Keyspace, req.Shard)
@ -2986,7 +2986,7 @@ func (s *VtctldServer) ShardReplicationPositions(ctx context.Context, req *vtctl
span.Annotate("tablet_alias", alias)
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
var status *replicationdatapb.Status
@ -3028,7 +3028,7 @@ func (s *VtctldServer) ShardReplicationPositions(ctx context.Context, req *vtctl
span.Annotate("tablet_alias", alias)
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
status, err := s.tmc.ReplicationStatus(ctx, tablet)
@ -3101,7 +3101,7 @@ func (s *VtctldServer) SleepTablet(ctx context.Context, req *vtctldatapb.SleepTa
if err != nil {
return nil, err
} else if !ok {
dur = *topo.RemoteOperationTimeout
dur = topo.RemoteOperationTimeout
}
span.Annotate("sleep_duration", dur.String())
@ -3409,7 +3409,7 @@ func (s *VtctldServer) UpdateCellInfo(ctx context.Context, req *vtctldatapb.Upda
span.Annotate("cell_server_address", req.CellInfo.ServerAddress)
span.Annotate("cell_root", req.CellInfo.Root)
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
var updatedCi *topodatapb.CellInfo
@ -3457,7 +3457,7 @@ func (s *VtctldServer) UpdateCellsAlias(ctx context.Context, req *vtctldatapb.Up
span.Annotate("cells_alias", req.Name)
span.Annotate("cells_alias_cells", strings.Join(req.CellsAlias.Cells, ","))
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
var updatedCa *topodatapb.CellsAlias
@ -3490,7 +3490,7 @@ func (s *VtctldServer) Validate(ctx context.Context, req *vtctldatapb.ValidateRe
span.Annotate("ping_tablets", req.PingTablets)
resp = &vtctldatapb.ValidateResponse{}
getKeyspacesCtx, getKeyspacesCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
getKeyspacesCtx, getKeyspacesCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer getKeyspacesCancel()
keyspaces, err := s.ts.GetKeyspaces(getKeyspacesCtx)
@ -3513,7 +3513,7 @@ func (s *VtctldServer) Validate(ctx context.Context, req *vtctldatapb.ValidateRe
cellSet := sets.NewString()
for _, keyspace := range keyspaces {
getShardNamesCtx, getShardNamesCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
getShardNamesCtx, getShardNamesCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
shards, err := s.ts.GetShardNames(getShardNamesCtx, keyspace)
getShardNamesCancel() // don't defer in a loop
@ -3525,7 +3525,7 @@ func (s *VtctldServer) Validate(ctx context.Context, req *vtctldatapb.ValidateRe
}
for _, shard := range shards {
findAllTabletAliasesCtx, findAllTabletAliasesCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
findAllTabletAliasesCtx, findAllTabletAliasesCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
aliases, err := s.ts.FindAllTabletAliasesInShard(findAllTabletAliasesCtx, keyspace, shard)
findAllTabletAliasesCancel() // don't defer in a loop
@ -3543,7 +3543,7 @@ func (s *VtctldServer) Validate(ctx context.Context, req *vtctldatapb.ValidateRe
}
for _, cell := range cellSet.List() {
getTabletsByCellCtx, getTabletsByCellCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
getTabletsByCellCtx, getTabletsByCellCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
aliases, err := s.ts.GetTabletAliasesByCell(getTabletsByCellCtx, cell)
getTabletsByCellCancel() // don't defer in a loop
@ -3565,7 +3565,7 @@ func (s *VtctldServer) Validate(ctx context.Context, req *vtctldatapb.ValidateRe
key := topoproto.TabletAliasString(alias)
span.Annotate("tablet_alias", key)
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
if err := topo.Validate(ctx, s.ts, alias); err != nil {
@ -3625,7 +3625,7 @@ func (s *VtctldServer) ValidateKeyspace(ctx context.Context, req *vtctldatapb.Va
span.Annotate("ping_tablets", req.PingTablets)
resp = &vtctldatapb.ValidateKeyspaceResponse{}
getShardNamesCtx, getShardNamesCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
getShardNamesCtx, getShardNamesCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer getShardNamesCancel()
shards, err := s.ts.GetShardNames(getShardNamesCtx, req.Keyspace)
@ -3819,7 +3819,7 @@ func (s *VtctldServer) ValidateShard(ctx context.Context, req *vtctldatapb.Valid
span.Annotate("ping_tablets", req.PingTablets)
resp = &vtctldatapb.ValidateShardResponse{}
getShardCtx, getShardCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
getShardCtx, getShardCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer getShardCancel()
si, err := s.ts.GetShard(getShardCtx, req.Keyspace, req.Shard)
@ -3829,7 +3829,7 @@ func (s *VtctldServer) ValidateShard(ctx context.Context, req *vtctldatapb.Valid
return resp, err
}
findAllTabletAliasesCtx, findAllTabletAliasesCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
findAllTabletAliasesCtx, findAllTabletAliasesCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer findAllTabletAliasesCancel()
aliases, err := s.ts.FindAllTabletAliasesInShard(findAllTabletAliasesCtx, req.Keyspace, req.Shard)
@ -3839,7 +3839,7 @@ func (s *VtctldServer) ValidateShard(ctx context.Context, req *vtctldatapb.Valid
return resp, err
}
getTabletMapCtx, getTabletMapCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
getTabletMapCtx, getTabletMapCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer getTabletMapCancel()
tabletMap, _ := s.ts.GetTabletMap(getTabletMapCtx, aliases)
@ -3878,7 +3878,7 @@ func (s *VtctldServer) ValidateShard(ctx context.Context, req *vtctldatapb.Valid
go func(alias *topodatapb.TabletAlias) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
if err := topo.Validate(ctx, s.ts, alias); err != nil {
@ -3904,7 +3904,7 @@ func (s *VtctldServer) ValidateShard(ctx context.Context, req *vtctldatapb.Valid
return
}
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
replicaList, err := s.tmc.GetReplicas(ctx, primaryTabletInfo.Tablet)
@ -3962,7 +3962,7 @@ func (s *VtctldServer) ValidateShard(ctx context.Context, req *vtctldatapb.Valid
go func(alias string, ti *topo.TabletInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
if err := s.tmc.Ping(ctx, ti.Tablet); err != nil {

Просмотреть файл

@ -723,7 +723,7 @@ func TestSleepTablet(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, tt.expected, resp)
dur := expectedDur(t, tt.req.Duration, *topo.RemoteOperationTimeout)
dur := expectedDur(t, tt.req.Duration, topo.RemoteOperationTimeout)
assert.LessOrEqual(t, dur, sleepDur, "sleep should have taken at least %v; took %v", dur, sleepDur)
})
}

Просмотреть файл

@ -87,7 +87,7 @@ func commandInitShardPrimary(ctx context.Context, wr *wrangler.Wrangler, subFlag
}
force := subFlags.Bool("force", false, "will force the reparent even if the provided tablet is not writable or the shard primary")
waitReplicasTimeout := subFlags.Duration("wait_replicas_timeout", *topo.RemoteOperationTimeout, "time to wait for replicas to catch up in reparenting")
waitReplicasTimeout := subFlags.Duration("wait_replicas_timeout", topo.RemoteOperationTimeout, "time to wait for replicas to catch up in reparenting")
if err := subFlags.Parse(args); err != nil {
return err
}
@ -110,7 +110,7 @@ func commandPlannedReparentShard(ctx context.Context, wr *wrangler.Wrangler, sub
return fmt.Errorf("active reparent commands disabled (unset the --disable_active_reparents flag to enable)")
}
waitReplicasTimeout := subFlags.Duration("wait_replicas_timeout", *topo.RemoteOperationTimeout, "time to wait for replicas to catch up on replication before and after reparenting")
waitReplicasTimeout := subFlags.Duration("wait_replicas_timeout", topo.RemoteOperationTimeout, "time to wait for replicas to catch up on replication before and after reparenting")
keyspaceShard := subFlags.String("keyspace_shard", "", "keyspace/shard of the shard that needs to be reparented")
newPrimary := subFlags.String("new_primary", "", "alias of a tablet that should be the new primary")
avoidTablet := subFlags.String("avoid_tablet", "", "alias of a tablet that should not be the primary, i.e. reparent to any other tablet if this one is the primary")
@ -154,7 +154,7 @@ func commandEmergencyReparentShard(ctx context.Context, wr *wrangler.Wrangler, s
return fmt.Errorf("active reparent commands disabled (unset the --disable_active_reparents flag to enable)")
}
waitReplicasTimeout := subFlags.Duration("wait_replicas_timeout", *topo.RemoteOperationTimeout, "time to wait for replicas to catch up in reparenting")
waitReplicasTimeout := subFlags.Duration("wait_replicas_timeout", topo.RemoteOperationTimeout, "time to wait for replicas to catch up in reparenting")
keyspaceShard := subFlags.String("keyspace_shard", "", "keyspace/shard of the shard that needs to be reparented")
newPrimary := subFlags.String("new_primary", "", "optional alias of a tablet that should be the new primary. If not specified, Vitess will select the best candidate")
preventCrossCellPromotion := subFlags.Bool("prevent_cross_cell_promotion", false, "only promotes a new primary from the same cell as the previous primary")

Просмотреть файл

@ -226,7 +226,7 @@ func (pr *PlannedReparenter) performGracefulPromotion(
// First, we find the position of the current primary. Note that this is
// just a snapshot of the position, since we let it keep accepting writes
// until we're sure we want to proceed with the promotion.
snapshotCtx, snapshotCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
snapshotCtx, snapshotCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer snapshotCancel()
snapshotPos, err := pr.tmc.PrimaryPosition(snapshotCtx, currentPrimary.Tablet)
@ -260,7 +260,7 @@ func (pr *PlannedReparenter) performGracefulPromotion(
pr.logger.Infof("demoting current primary: %v", currentPrimary.AliasString())
event.DispatchUpdate(ev, "demoting old primary")
demoteCtx, demoteCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
demoteCtx, demoteCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer demoteCancel()
primaryStatus, err := pr.tmc.DemotePrimary(demoteCtx, currentPrimary.Tablet)
@ -290,7 +290,7 @@ func (pr *PlannedReparenter) performGracefulPromotion(
// that not enough time is left on the it to finish the rollback.
// We create a new background context to avoid a partial rollback, which
// could leave the cluster in a worse state than when we started.
undoCtx, undoCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
undoCtx, undoCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer undoCancel()
if undoErr := pr.tmc.UndoDemotePrimary(undoCtx, currentPrimary.Tablet, SemiSyncAckers(opts.durability, currentPrimary.Tablet) > 0); undoErr != nil {
@ -355,7 +355,7 @@ func (pr *PlannedReparenter) performPartialPromotionRecovery(ctx context.Context
// It's possible that a previous attempt to reparent failed to SetReadWrite,
// so call it here to make sure the underlying MySQL is read-write on the
// candidate primary.
setReadWriteCtx, setReadWriteCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
setReadWriteCtx, setReadWriteCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer setReadWriteCancel()
if err := pr.tmc.SetReadWrite(setReadWriteCtx, primaryElect); err != nil {
@ -363,7 +363,7 @@ func (pr *PlannedReparenter) performPartialPromotionRecovery(ctx context.Context
}
// The primary is already the one we want according to its tablet record.
refreshCtx, refreshCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer refreshCancel()
// Get the replication position so we can try to fix the replicas (back in
@ -415,7 +415,7 @@ func (pr *PlannedReparenter) performPotentialPromotion(
rec concurrency.AllErrorRecorder
)
stopAllCtx, stopAllCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
stopAllCtx, stopAllCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer stopAllCancel()
for alias, tabletInfo := range tabletMap {
@ -501,7 +501,7 @@ func (pr *PlannedReparenter) performPotentialPromotion(
}
// Promote the candidate primary to type:PRIMARY.
promoteCtx, promoteCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
promoteCtx, promoteCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer promoteCancel()
rp, err := pr.tmc.PromoteReplica(promoteCtx, primaryElect, SemiSyncAckers(opts.durability, primaryElect) > 0)

Просмотреть файл

@ -1160,7 +1160,7 @@ func commandChangeTabletType(ctx context.Context, wr *wrangler.Wrangler, subFlag
return err
}
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
if *dryRun {

Просмотреть файл

@ -415,7 +415,7 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
shardStreamKey := fmt.Sprintf("%s/%s", tablet.Shard, tablet.AliasString())
shardStream, ok := workflow.ShardStreams[shardStreamKey]
if !ok {
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
si, err := s.ts.GetShard(ctx, req.Keyspace, tablet.Shard)

Просмотреть файл

@ -153,7 +153,7 @@ func (vx *VExec) QueryContext(ctx context.Context, query string) (map[*topo.Tabl
func (vx *VExec) initialize(ctx context.Context) error {
vx.primaries = nil
getShardsCtx, getShardsCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
getShardsCtx, getShardsCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer getShardsCancel()
shards, err := vx.ts.GetShardNames(getShardsCtx, vx.keyspace)
@ -168,7 +168,7 @@ func (vx *VExec) initialize(ctx context.Context) error {
primaries := make([]*topo.TabletInfo, 0, len(shards))
for _, shard := range shards {
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
si, err := vx.ts.GetShard(ctx, vx.keyspace, shard)

Просмотреть файл

@ -104,7 +104,7 @@ func OpenTabletDiscoveryWithAcitve(ctx context.Context, cellsToWatch, clustersTo
tmclient.NewTabletManagerClient(),
)
var shards []*controller.GRShard
ctx, cancel := context.WithTimeout(vtgr.ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(vtgr.ctx, topo.RemoteOperationTimeout)
defer cancel()
for _, ks := range clustersToWatch {
if strings.Contains(ks, "/") {

Просмотреть файл

@ -61,7 +61,7 @@ func SwitchPrimary(newPrimaryKey, oldPrimaryKey InstanceKey) error {
log.Errorf("Unexpected: tablet type did not change to primary: %v", newPrimaryTablet.Type)
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
_, err = TopoServ.UpdateShardFields(ctx, newPrimaryTablet.Keyspace, newPrimaryTablet.Shard, func(si *topo.ShardInfo) error {
if proto.Equal(si.PrimaryAlias, newPrimaryTablet.Alias) && proto.Equal(si.PrimaryTermStartTime, newPrimaryTablet.PrimaryTermStartTime) {
@ -103,12 +103,12 @@ func ChangeTabletType(instanceKey InstanceKey, tabletType topodatapb.TabletType,
return nil, err
}
tmc := tmclient.NewTabletManagerClient()
tmcCtx, tmcCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
tmcCtx, tmcCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer tmcCancel()
if err := tmc.ChangeType(tmcCtx, tablet, tabletType, semiSync); err != nil {
return nil, err
}
tsCtx, tsCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
tsCtx, tsCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer tsCancel()
ti, err := TopoServ.GetTablet(tsCtx, tablet.Alias)
if err != nil {
@ -128,7 +128,7 @@ func ResetReplicationParameters(instanceKey InstanceKey) error {
return err
}
tmc := tmclient.NewTabletManagerClient()
tmcCtx, tmcCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
tmcCtx, tmcCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer tmcCancel()
if err := tmc.ResetReplicationParameters(tmcCtx, tablet); err != nil {
return err
@ -143,7 +143,7 @@ func FullStatus(instanceKey InstanceKey) (*replicationdatapb.FullStatus, error)
return nil, err
}
tmc := tmclient.NewTabletManagerClient()
tmcCtx, tmcCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
tmcCtx, tmcCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer tmcCancel()
return tmc.FullStatus(tmcCtx, tablet)
}

Просмотреть файл

@ -32,7 +32,7 @@ import (
func RefreshAllKeyspaces() {
var keyspaces []string
if len(clustersToWatch) == 0 { // all known keyspaces
ctx, cancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
var err error
// Get all the keyspaces
@ -62,7 +62,7 @@ func RefreshAllKeyspaces() {
// Sort the list of keyspaces.
// The list can have duplicates because the input to clusters to watch may have multiple shards of the same keyspace
sort.Strings(keyspaces)
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
for idx, keyspace := range keyspaces {
@ -83,7 +83,7 @@ func RefreshAllKeyspaces() {
// RefreshKeyspace refreshes the keyspace's information for the given keyspace from the topo
func RefreshKeyspace(keyspaceName string) error {
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer refreshCancel()
return refreshKeyspace(refreshCtx, keyspaceName)
}

Просмотреть файл

@ -25,6 +25,7 @@ import (
"time"
"github.com/spf13/pflag"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
@ -88,7 +89,7 @@ func refreshTabletsUsing(loader func(instanceKey *inst.InstanceKey), forceRefres
return
}
if len(clustersToWatch) == 0 { // all known clusters
ctx, cancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
cells, err := ts.GetKnownCells(ctx)
if err != nil {
@ -96,7 +97,7 @@ func refreshTabletsUsing(loader func(instanceKey *inst.InstanceKey), forceRefres
return
}
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
for _, cell := range cells {
@ -117,7 +118,7 @@ func refreshTabletsUsing(loader func(instanceKey *inst.InstanceKey), forceRefres
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: input[0], Shard: input[1]})
} else {
// Assume this is a keyspace and find all shards in keyspace
ctx, cancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
shards, err := ts.GetShardNames(ctx, ks)
if err != nil {
@ -138,7 +139,7 @@ func refreshTabletsUsing(loader func(instanceKey *inst.InstanceKey), forceRefres
log.Errorf("Found no keyspaceShards for input: %+v", clustersToWatch)
return
}
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
for _, ks := range keyspaceShards {
@ -168,7 +169,7 @@ func refreshTabletsInCell(ctx context.Context, cell string, loader func(instance
// change the replication information for the entire cluster drastically enough to warrant a full forceful refresh
func forceRefreshAllTabletsInShard(ctx context.Context, keyspace, shard string) {
log.Infof("force refresh of all tablets in shard - %v/%v", keyspace, shard)
refreshCtx, refreshCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer refreshCancel()
refreshTabletsInKeyspaceShard(refreshCtx, keyspace, shard, func(instanceKey *inst.InstanceKey) {
DiscoverInstance(*instanceKey, true)

Просмотреть файл

@ -550,7 +550,7 @@ func (tm *TabletManager) startReplication(ctx context.Context, pos mysql.Positio
// the initial pos before proceeding
tmc := tmclient.NewTabletManagerClient()
defer tmc.Close()
remoteCtx, remoteCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
remoteCtx, remoteCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer remoteCancel()
posStr, err := tmc.PrimaryPosition(remoteCtx, primary.Tablet)
if err != nil {

Просмотреть файл

@ -166,7 +166,7 @@ func (tm *TabletManager) shardSyncLoop(ctx context.Context, notifyChan <-chan st
// success (we successfully synchronized), but the returned primaryAlias will be
// different from the input tablet.Alias.
func syncShardPrimary(ctx context.Context, ts *topo.Server, tablet *topodatapb.Tablet, PrimaryTermStartTime time.Time) (primaryAlias *topodatapb.TabletAlias, shouldDemote bool, err error) {
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
var shardInfo *topo.ShardInfo
@ -220,7 +220,7 @@ func (tm *TabletManager) endPrimaryTerm(ctx context.Context, primaryAlias *topod
if mysqlctl.DisableActiveReparents {
// Don't touch anything at the MySQL level. Just update tablet state.
log.Infof("Active reparents are disabled; updating tablet state only.")
changeTypeCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
changeTypeCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
if err := tm.tmState.ChangeTabletType(changeTypeCtx, tm.baseTabletType, DBActionNone); err != nil {
return vterrors.Wrapf(err, "failed to change type to %v", tm.baseTabletType)
@ -234,12 +234,12 @@ func (tm *TabletManager) endPrimaryTerm(ctx context.Context, primaryAlias *topod
// no return. Instead, we should leave partial results and retry the rest
// later.
log.Infof("Active reparents are enabled; converting MySQL to replica.")
demotePrimaryCtx, cancelDemotePrimary := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
demotePrimaryCtx, cancelDemotePrimary := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancelDemotePrimary()
if _, err := tm.demotePrimary(demotePrimaryCtx, false /* revertPartialFailure */); err != nil {
return vterrors.Wrap(err, "failed to demote primary")
}
setPrimaryCtx, cancelSetPrimary := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
setPrimaryCtx, cancelSetPrimary := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancelSetPrimary()
log.Infof("Attempting to reparent self to new primary %v.", primaryAliasStr)
if primaryAlias == nil {

Просмотреть файл

@ -452,7 +452,7 @@ func (tm *TabletManager) Close() {
return nil
}
updateCtx, updateCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
updateCtx, updateCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer updateCancel()
if _, err := tm.TopoServer.UpdateTabletFields(updateCtx, tm.tabletAlias, f); err != nil {

Просмотреть файл

@ -411,7 +411,7 @@ func (ts *tmState) publishStateLocked(ctx context.Context) {
return
}
// Fast path: publish immediately.
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
_, err := ts.tm.TopoServer.UpdateTabletFields(ctx, ts.tm.tabletAlias, func(tablet *topodatapb.Tablet) error {
if err := topotools.CheckOwnership(tablet, ts.tablet); err != nil {
@ -444,7 +444,7 @@ func (ts *tmState) retryPublish() {
for {
// Retry immediately the first time because the previous failure might have been
// due to an expired context.
ctx, cancel := context.WithTimeout(ts.ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ts.ctx, topo.RemoteOperationTimeout)
_, err := ts.tm.TopoServer.UpdateTabletFields(ctx, ts.tm.tabletAlias, func(tablet *topodatapb.Tablet) error {
if err := topotools.CheckOwnership(tablet, ts.tablet); err != nil {
log.Error(err)

Просмотреть файл

@ -718,7 +718,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
primary.StopActionLoop(t)
// set a short timeout so that we don't have to wait 30 seconds
*topo.RemoteOperationTimeout = 2 * time.Second
topo.RemoteOperationTimeout = 2 * time.Second
// Restore should still succeed
require.NoError(t, destTablet.TM.RestoreData(ctx, logutil.NewConsoleLogger(), 0 /* waitForBackupInterval */, false /* deleteBeforeRestore */, time.Time{} /* restoreFromBackupTs */))
// verify the full status

Просмотреть файл

@ -570,7 +570,7 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) (
}
// We set a topo timeout since we contact topo for the shard record.
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
var sourceKeyspace string
sourceShards := sets.NewString()

Просмотреть файл

@ -36,7 +36,7 @@ var (
// lock actions use RemoteOperationTimeout,
// so basing this to be greater than RemoteOperationTimeout is good.
// Use this as the default value for Context that need a deadline.
DefaultActionTimeout = *topo.RemoteOperationTimeout * 4
DefaultActionTimeout = topo.RemoteOperationTimeout * 4
)
// Wrangler manages complex actions on the topology, like reparents,

Просмотреть файл

@ -1,4 +1,4 @@
TOPOLOGY_FLAGS=-topo_implementation consul -topo_global_server_address consul1:8500 -topo_global_root vitess/global
TOPOLOGY_FLAGS=--topo_implementation consul --topo_global_server_address consul1:8500 --topo_global_root vitess/global
GRPC_PORT=15999
WEB_PORT=8080
MYSQL_PORT=15306