зеркало из https://github.com/github/vitess-gh.git
Moving the lock timeout flag to actionnode module. That's the only place
it's used, and then it doesn't need to be passed around everywhere.
This commit is contained in:
Родитель
db9290f3be
Коммит
9a0c0c4fdd
|
@ -154,7 +154,7 @@ func initTabletMap(ts topo.Server, topology string, mysqld mysqlctl.MysqlDaemon,
|
|||
|
||||
// Rebuild the SrvKeyspace objects, we we can support range-based
|
||||
// sharding queries.
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, nil, 30*time.Second /*lockTimeout*/)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, nil)
|
||||
for keyspace := range keyspaceMap {
|
||||
if err := wr.RebuildKeyspaceGraph(ctx, keyspace, nil, true); err != nil {
|
||||
log.Fatalf("cannot rebuild %v: %v", keyspace, err)
|
||||
|
|
|
@ -25,8 +25,7 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
waitTime = flag.Duration("wait-time", 24*time.Hour, "time to wait on an action")
|
||||
lockWaitTimeout = flag.Duration("lock-wait-timeout", time.Minute, "time to wait for a lock before starting an action")
|
||||
waitTime = flag.Duration("wait-time", 24*time.Hour, "time to wait on an action")
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -82,7 +81,7 @@ func main() {
|
|||
defer topo.CloseServers()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), topoServer, tmclient.NewTabletManagerClient(), *lockWaitTimeout)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), topoServer, tmclient.NewTabletManagerClient())
|
||||
installSignalHandlers(cancel)
|
||||
|
||||
for _, f := range initFuncs {
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
|
||||
"github.com/youtube/vitess/go/acl"
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/topo/topoproto"
|
||||
|
@ -22,7 +21,6 @@ import (
|
|||
|
||||
var (
|
||||
actionTimeout = flag.Duration("action_timeout", wrangler.DefaultActionTimeout, "time to wait for an action before resorting to force")
|
||||
lockTimeout = flag.Duration("lock_timeout", actionnode.DefaultLockTimeout, "lock time for wrangler/topo operations")
|
||||
)
|
||||
|
||||
// ActionResult contains the result of an action. If Error, the aciton failed.
|
||||
|
@ -102,7 +100,7 @@ func (ar *ActionRepository) ApplyKeyspaceAction(ctx context.Context, actionName,
|
|||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, *actionTimeout)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient(), *lockTimeout)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient())
|
||||
output, err := action(ctx, wr, keyspace, r)
|
||||
cancel()
|
||||
if err != nil {
|
||||
|
@ -129,7 +127,7 @@ func (ar *ActionRepository) ApplyShardAction(ctx context.Context, actionName, ke
|
|||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, *actionTimeout)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient(), *lockTimeout)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient())
|
||||
output, err := action(ctx, wr, keyspace, shard, r)
|
||||
cancel()
|
||||
if err != nil {
|
||||
|
@ -163,7 +161,7 @@ func (ar *ActionRepository) ApplyTabletAction(ctx context.Context, actionName st
|
|||
|
||||
// run the action
|
||||
ctx, cancel := context.WithTimeout(ctx, *actionTimeout)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient(), *lockTimeout)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient())
|
||||
output, err := action.method(ctx, wr, tabletAlias, r)
|
||||
cancel()
|
||||
if err != nil {
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
|
@ -66,7 +65,7 @@ func TestAPI(t *testing.T) {
|
|||
KeyRange: &pb.KeyRange{Start: nil, End: []byte{0x80}},
|
||||
PortMap: map[string]int32{"vt": 200},
|
||||
})
|
||||
topotools.RebuildShard(ctx, logutil.NewConsoleLogger(), ts, "ks1", "-80", cells, 10*time.Second)
|
||||
topotools.RebuildShard(ctx, logutil.NewConsoleLogger(), ts, "ks1", "-80", cells)
|
||||
|
||||
// Populate fake actions.
|
||||
actionRepo.RegisterKeyspaceAction("TestKeyspaceAction",
|
||||
|
|
|
@ -80,7 +80,7 @@ func (s *streamHealthTabletServer) BroadcastHealth(terTimestamp int64, stats *pb
|
|||
func TestTabletData(t *testing.T) {
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
|
||||
if err := ts.CreateKeyspace(context.Background(), "ks", &pbt.Keyspace{
|
||||
ShardingColumnName: "keyspace_id",
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/janitor"
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
|
@ -20,7 +19,6 @@ import (
|
|||
|
||||
var (
|
||||
sleepTime = flag.Duration("sleep_time", 3*time.Minute, "how long to sleep between janitor runs")
|
||||
lockTimeout = flag.Duration("lock_timeout", actionnode.DefaultLockTimeout, "lock time for wrangler/topo operations")
|
||||
keyspace = flag.String("keyspace", "", "keyspace to manage")
|
||||
shard = flag.String("shard", "", "shard to manage")
|
||||
dryRunModules flagutil.StringListValue
|
||||
|
@ -53,7 +51,7 @@ func main() {
|
|||
|
||||
ts := topo.GetServer()
|
||||
|
||||
scheduler, err := janitor.New(*keyspace, *shard, ts, wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), *lockTimeout), *sleepTime)
|
||||
scheduler, err := janitor.New(*keyspace, *shard, ts, wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()), *sleepTime)
|
||||
if err != nil {
|
||||
log.Errorf("janitor.New: %v", err)
|
||||
exit.Return(1)
|
||||
|
|
|
@ -16,7 +16,6 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/tableacl"
|
||||
"github.com/youtube/vitess/go/vt/tableacl/simpleacl"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/topo/topoproto"
|
||||
|
@ -33,7 +32,6 @@ var (
|
|||
tableAclConfig = flag.String("table-acl-config", "", "path to table access checker config file")
|
||||
tabletPath = flag.String("tablet-path", "", "tablet alias")
|
||||
overridesFile = flag.String("schema-override", "", "schema overrides file")
|
||||
lockTimeout = flag.Duration("lock_timeout", actionnode.DefaultLockTimeout, "lock time for wrangler/topo operations")
|
||||
|
||||
agent *tabletmanager.ActionAgent
|
||||
)
|
||||
|
@ -108,7 +106,7 @@ func main() {
|
|||
if servenv.GRPCPort != nil {
|
||||
gRPCPort = int32(*servenv.GRPCPort)
|
||||
}
|
||||
agent, err = tabletmanager.NewActionAgent(context.Background(), mysqld, qsc, tabletAlias, dbcfgs, mycnf, int32(*servenv.Port), gRPCPort, *overridesFile, *lockTimeout)
|
||||
agent, err = tabletmanager.NewActionAgent(context.Background(), mysqld, qsc, tabletAlias, dbcfgs, mycnf, int32(*servenv.Port), gRPCPort, *overridesFile)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
exit.Return(1)
|
||||
|
|
|
@ -60,7 +60,7 @@ func main() {
|
|||
ts := topo.GetServer()
|
||||
defer topo.CloseServers()
|
||||
|
||||
wi = worker.NewInstance(ts, *cell, 30*time.Second, *commandDisplayInterval)
|
||||
wi = worker.NewInstance(ts, *cell, *commandDisplayInterval)
|
||||
wi.InstallSignalHandlers()
|
||||
wi.InitStatusHandling()
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ package actionnode
|
|||
// topology server.
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"time"
|
||||
|
||||
log "github.com/golang/glog"
|
||||
|
@ -20,6 +21,10 @@ var (
|
|||
// DefaultLockTimeout is a good value to use as a default for
|
||||
// locking a shard / keyspace.
|
||||
DefaultLockTimeout = 30 * time.Second
|
||||
|
||||
// LockTimeout is the command line flag that introduces a shorter
|
||||
// timeout for locking topology structures.
|
||||
LockTimeout = flag.Duration("lock_timeout", DefaultLockTimeout, "timeout for acquiring topology locks")
|
||||
)
|
||||
|
||||
// LockKeyspace will lock the keyspace in the topology server.
|
||||
|
@ -27,6 +32,9 @@ var (
|
|||
func (n *ActionNode) LockKeyspace(ctx context.Context, ts topo.Server, keyspace string) (lockPath string, err error) {
|
||||
log.Infof("Locking keyspace %v for action %v", keyspace, n.Action)
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, *LockTimeout)
|
||||
defer cancel()
|
||||
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartClient("TopoServer.LockKeyspaceForAction")
|
||||
span.Annotate("action", n.Action)
|
||||
|
@ -89,6 +97,9 @@ func (n *ActionNode) UnlockKeyspace(ctx context.Context, ts topo.Server, keyspac
|
|||
func (n *ActionNode) LockShard(ctx context.Context, ts topo.Server, keyspace, shard string) (lockPath string, err error) {
|
||||
log.Infof("Locking shard %v/%v for action %v", keyspace, shard, n.Action)
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, *LockTimeout)
|
||||
defer cancel()
|
||||
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartClient("TopoServer.LockShardForAction")
|
||||
span.Annotate("action", n.Action)
|
||||
|
@ -153,6 +164,9 @@ func (n *ActionNode) UnlockShard(ctx context.Context, ts topo.Server, keyspace,
|
|||
func (n *ActionNode) LockSrvShard(ctx context.Context, ts topo.Server, cell, keyspace, shard string) (lockPath string, err error) {
|
||||
log.Infof("Locking serving shard %v/%v/%v for action %v", cell, keyspace, shard, n.Action)
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, *LockTimeout)
|
||||
defer cancel()
|
||||
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartClient("TopoServer.LockSrvShardForAction")
|
||||
span.Annotate("action", n.Action)
|
||||
|
|
|
@ -73,7 +73,6 @@ type ActionAgent struct {
|
|||
DBConfigs dbconfigs.DBConfigs
|
||||
SchemaOverrides []tabletserver.SchemaOverride
|
||||
BinlogPlayerMap *BinlogPlayerMap
|
||||
LockTimeout time.Duration
|
||||
|
||||
// exportStats is set only for production tablet.
|
||||
exportStats bool
|
||||
|
@ -158,7 +157,6 @@ func NewActionAgent(
|
|||
mycnf *mysqlctl.Mycnf,
|
||||
port, gRPCPort int32,
|
||||
overridesFile string,
|
||||
lockTimeout time.Duration,
|
||||
) (agent *ActionAgent, err error) {
|
||||
schemaOverrides := loadSchemaOverrides(overridesFile)
|
||||
|
||||
|
@ -173,7 +171,6 @@ func NewActionAgent(
|
|||
MysqlDaemon: mysqld,
|
||||
DBConfigs: dbcfgs,
|
||||
SchemaOverrides: schemaOverrides,
|
||||
LockTimeout: lockTimeout,
|
||||
History: history.New(historyLength),
|
||||
lastHealthMapCount: stats.NewInt("LastHealthMapCount"),
|
||||
_healthy: fmt.Errorf("healthcheck not run yet"),
|
||||
|
|
|
@ -7,7 +7,6 @@ package tabletmanager
|
|||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/youtube/vitess/go/history"
|
||||
"github.com/youtube/vitess/go/stats"
|
||||
|
@ -43,7 +42,6 @@ func TestInitTablet(t *testing.T) {
|
|||
DBConfigs: dbconfigs.DBConfigs{},
|
||||
SchemaOverrides: nil,
|
||||
BinlogPlayerMap: nil,
|
||||
LockTimeout: 10 * time.Second,
|
||||
batchCtx: ctx,
|
||||
History: history.New(historyLength),
|
||||
lastHealthMapCount: new(stats.Int),
|
||||
|
|
|
@ -7,7 +7,6 @@ package topotools
|
|||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/youtube/vitess/go/trace"
|
||||
"github.com/youtube/vitess/go/vt/concurrency"
|
||||
|
@ -25,7 +24,7 @@ import (
|
|||
//
|
||||
// This function will start each cell over from the beginning on ErrBadVersion,
|
||||
// so it doesn't need a lock on the shard.
|
||||
func RebuildShard(ctx context.Context, log logutil.Logger, ts topo.Server, keyspace, shard string, cells []string, lockTimeout time.Duration) (*topo.ShardInfo, error) {
|
||||
func RebuildShard(ctx context.Context, log logutil.Logger, ts topo.Server, keyspace, shard string, cells []string) (*topo.ShardInfo, error) {
|
||||
log.Infof("RebuildShard %v/%v", keyspace, shard)
|
||||
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
|
@ -70,7 +69,7 @@ func TestRebuildShard(t *testing.T) {
|
|||
replicaInfo := addTablet(ctx, t, ts, 2, cells[0], pb.TabletType_REPLICA)
|
||||
|
||||
// Do an initial rebuild.
|
||||
if _, err := RebuildShard(ctx, logger, ts, testKeyspace, testShard, cells, time.Minute); err != nil {
|
||||
if _, err := RebuildShard(ctx, logger, ts, testKeyspace, testShard, cells); err != nil {
|
||||
t.Fatalf("RebuildShard: %v", err)
|
||||
}
|
||||
|
||||
|
@ -95,7 +94,7 @@ func TestRebuildShard(t *testing.T) {
|
|||
if err := ts.UpdateTablet(ctx, masterInfo); err != nil {
|
||||
t.Fatalf("UpdateTablet: %v", err)
|
||||
}
|
||||
if _, err := RebuildShard(ctx, logger, ts, testKeyspace, testShard, cells, time.Minute); err != nil {
|
||||
if _, err := RebuildShard(ctx, logger, ts, testKeyspace, testShard, cells); err != nil {
|
||||
t.Fatalf("RebuildShard: %v", err)
|
||||
}
|
||||
|
||||
|
@ -104,7 +103,7 @@ func TestRebuildShard(t *testing.T) {
|
|||
if err := ts.UpdateTablet(ctx, replicaInfo); err != nil {
|
||||
t.Fatalf("UpdateTablet: %v", err)
|
||||
}
|
||||
if _, err := RebuildShard(ctx, logger, ts, testKeyspace, testShard, cells, time.Minute); err != nil {
|
||||
if _, err := RebuildShard(ctx, logger, ts, testKeyspace, testShard, cells); err != nil {
|
||||
t.Fatalf("RebuildShard: %v", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ func (s *VtctlServer) ExecuteVtctlCommand(ctx context.Context, query *gorpcproto
|
|||
}()
|
||||
|
||||
// create the wrangler
|
||||
wr := wrangler.New(logger, s.ts, tmclient.NewTabletManagerClient(), query.LockTimeout)
|
||||
wr := wrangler.New(logger, s.ts, tmclient.NewTabletManagerClient())
|
||||
// FIXME(alainjobart) use a single context, copy the source info from it
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), query.ActionTimeout)
|
||||
|
||||
|
|
|
@ -10,7 +10,6 @@ package grpcvtctlserver
|
|||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
|
@ -60,7 +59,7 @@ func (s *VtctlServer) ExecuteVtctlCommand(args *pb.ExecuteVtctlCommandRequest, s
|
|||
}()
|
||||
|
||||
// create the wrangler
|
||||
wr := wrangler.New(logger, s.ts, tmclient.NewTabletManagerClient(), time.Duration(args.LockTimeout))
|
||||
wr := wrangler.New(logger, s.ts, tmclient.NewTabletManagerClient())
|
||||
|
||||
// execute the command
|
||||
err = vtctl.RunCommand(stream.Context(), wr, args.Args)
|
||||
|
|
|
@ -45,12 +45,11 @@ type Instance struct {
|
|||
|
||||
topoServer topo.Server
|
||||
cell string
|
||||
lockTimeout time.Duration
|
||||
commandDisplayInterval time.Duration
|
||||
}
|
||||
|
||||
// NewInstance creates a new Instance.
|
||||
func NewInstance(ts topo.Server, cell string, lockTimeout, commandDisplayInterval time.Duration) *Instance {
|
||||
func NewInstance(ts topo.Server, cell string, commandDisplayInterval time.Duration) *Instance {
|
||||
wi := &Instance{topoServer: ts, cell: cell, commandDisplayInterval: commandDisplayInterval}
|
||||
// Note: setAndStartWorker() also adds a MemoryLogger for the webserver.
|
||||
wi.wr = wi.CreateWrangler(logutil.NewConsoleLogger())
|
||||
|
@ -59,7 +58,7 @@ func NewInstance(ts topo.Server, cell string, lockTimeout, commandDisplayInterva
|
|||
|
||||
// CreateWrangler creates a new wrangler using the instance specific configuration.
|
||||
func (wi *Instance) CreateWrangler(logger logutil.Logger) *wrangler.Wrangler {
|
||||
return wrangler.New(logger, wi.topoServer, tmclient.NewTabletManagerClient(), wi.lockTimeout)
|
||||
return wrangler.New(logger, wi.topoServer, tmclient.NewTabletManagerClient())
|
||||
}
|
||||
|
||||
// setAndStartWorker will set the current worker.
|
||||
|
|
|
@ -244,7 +244,7 @@ func TestSplitClonePopulateBlpCheckpoint(t *testing.T) {
|
|||
func testSplitClone(t *testing.T, strategy string) {
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wi := NewInstance(ts, "cell1", time.Second, time.Second)
|
||||
wi := NewInstance(ts, "cell1", time.Second)
|
||||
|
||||
if err := ts.CreateKeyspace(context.Background(), "ks", &pbt.Keyspace{
|
||||
ShardingColumnName: "keyspace_id",
|
||||
|
|
|
@ -150,7 +150,7 @@ func (sq *sourceTabletServer) StreamExecute(ctx context.Context, target *pb.Targ
|
|||
func TestSplitDiff(t *testing.T) {
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wi := NewInstance(ts, "cell1", time.Second, time.Second)
|
||||
wi := NewInstance(ts, "cell1", time.Second)
|
||||
ctx := context.Background()
|
||||
|
||||
if err := ts.CreateKeyspace(context.Background(), "ks", &pbt.Keyspace{
|
||||
|
@ -195,7 +195,7 @@ func TestSplitDiff(t *testing.T) {
|
|||
// We need to use FakeTabletManagerClient because we don't
|
||||
// have a good way to fake the binlog player yet, which is
|
||||
// necessary for synchronizing replication.
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, faketmclient.NewFakeTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, faketmclient.NewFakeTabletManagerClient())
|
||||
excludedTable := "excludedTable1"
|
||||
gwrk, err := commandSplitDiff(wi, wr, subFlags, []string{
|
||||
"-exclude_tables", excludedTable,
|
||||
|
|
|
@ -229,7 +229,7 @@ func TestVerticalSplitClonePopulateBlpCheckpoint(t *testing.T) {
|
|||
func testVerticalSplitClone(t *testing.T, strategy string) {
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wi := NewInstance(ts, "cell1", time.Second, time.Second)
|
||||
wi := NewInstance(ts, "cell1", time.Second)
|
||||
|
||||
sourceMaster := testlib.NewFakeTablet(t, wi.wr, "cell1", 0,
|
||||
pbt.TabletType_MASTER, db, testlib.TabletKeyspaceShard(t, "source_ks", "0"))
|
||||
|
|
|
@ -85,7 +85,7 @@ func (sq *verticalDiffTabletServer) StreamExecute(ctx context.Context, target *p
|
|||
func TestVerticalSplitDiff(t *testing.T) {
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wi := NewInstance(ts, "cell1", time.Second, time.Second)
|
||||
wi := NewInstance(ts, "cell1", time.Second)
|
||||
ctx := context.Background()
|
||||
|
||||
sourceMaster := testlib.NewFakeTablet(t, wi.wr, "cell1", 0,
|
||||
|
@ -139,7 +139,7 @@ func TestVerticalSplitDiff(t *testing.T) {
|
|||
// We need to use FakeTabletManagerClient because we don't
|
||||
// have a good way to fake the binlog player yet, which is
|
||||
// necessary for synchronizing replication.
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, faketmclient.NewFakeTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, faketmclient.NewFakeTabletManagerClient())
|
||||
excludedTable := "excludedTable1"
|
||||
subFlags := flag.NewFlagSet("VerticalSplitDiff", flag.ContinueOnError)
|
||||
gwrk, err := commandVerticalSplitDiff(wi, wr, subFlags, []string{
|
||||
|
|
|
@ -30,7 +30,7 @@ func init() {
|
|||
// CreateWorkerInstance returns a properly configured vtworker instance.
|
||||
func CreateWorkerInstance(t *testing.T) *worker.Instance {
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
return worker.NewInstance(ts, "cell1", 30*time.Second, 1*time.Second)
|
||||
return worker.NewInstance(ts, "cell1", 1*time.Second)
|
||||
}
|
||||
|
||||
// TestSuite runs the test suite on the given vtworker and vtworkerclient
|
||||
|
|
|
@ -26,8 +26,6 @@ import (
|
|||
// keyspace related methods for Wrangler
|
||||
|
||||
func (wr *Wrangler) lockKeyspace(ctx context.Context, keyspace string, actionNode *actionnode.ActionNode) (lockPath string, err error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, wr.lockTimeout)
|
||||
defer cancel()
|
||||
return actionNode.LockKeyspace(ctx, wr.ts, keyspace)
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ import (
|
|||
// RebuildShardGraph rebuilds the serving and replication rollup data data while locking
|
||||
// out other changes.
|
||||
func (wr *Wrangler) RebuildShardGraph(ctx context.Context, keyspace, shard string, cells []string) (*topo.ShardInfo, error) {
|
||||
return topotools.RebuildShard(ctx, wr.logger, wr.ts, keyspace, shard, cells, wr.lockTimeout)
|
||||
return topotools.RebuildShard(ctx, wr.logger, wr.ts, keyspace, shard, cells)
|
||||
}
|
||||
|
||||
// RebuildKeyspaceGraph rebuilds the serving graph data while locking out other changes.
|
||||
|
|
|
@ -18,8 +18,6 @@ import (
|
|||
// shard related methods for Wrangler
|
||||
|
||||
func (wr *Wrangler) lockShard(ctx context.Context, keyspace, shard string, actionNode *actionnode.ActionNode) (lockPath string, err error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, wr.lockTimeout)
|
||||
defer cancel()
|
||||
return actionNode.LockShard(ctx, wr.ts, keyspace, shard)
|
||||
}
|
||||
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
|
@ -33,7 +32,7 @@ func TestBackupRestore(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
vp := NewVtctlPipe(t, ts)
|
||||
defer vp.Close()
|
||||
|
||||
|
|
|
@ -7,7 +7,6 @@ package testlib
|
|||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
|
@ -103,7 +102,7 @@ func TestCopySchemaShard_UseShardAsSource(t *testing.T) {
|
|||
func copySchema(t *testing.T, useShardAsSource bool) {
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
vp := NewVtctlPipe(t, ts)
|
||||
defer vp.Close()
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ import (
|
|||
func TestEmergencyReparentShard(t *testing.T) {
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
vp := NewVtctlPipe(t, ts)
|
||||
defer vp.Close()
|
||||
|
||||
|
@ -146,7 +146,7 @@ func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
|
||||
// Create a master, a couple good slaves
|
||||
oldMaster := NewFakeTablet(t, wr, "cell1", 0, pb.TabletType_MASTER, db)
|
||||
|
|
|
@ -29,7 +29,7 @@ func TestInitMasterShard(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
vp := NewVtctlPipe(t, ts)
|
||||
defer vp.Close()
|
||||
|
||||
|
@ -129,7 +129,7 @@ func TestInitMasterShardChecks(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
|
||||
master := NewFakeTablet(t, wr, "cell1", 0, pb.TabletType_MASTER, db)
|
||||
|
||||
|
@ -167,7 +167,7 @@ func TestInitMasterShardOneSlaveFails(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
|
||||
// Create a master, a couple slaves
|
||||
master := NewFakeTablet(t, wr, "cell1", 0, pb.TabletType_MASTER, db)
|
||||
|
|
|
@ -7,7 +7,6 @@ package testlib
|
|||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/sqltypes"
|
||||
|
@ -26,7 +25,7 @@ func TestMigrateServedFrom(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
vp := NewVtctlPipe(t, ts)
|
||||
defer vp.Close()
|
||||
|
||||
|
|
|
@ -6,7 +6,6 @@ package testlib
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/sqltypes"
|
||||
|
@ -36,7 +35,7 @@ func checkShardServedTypes(t *testing.T, ts topo.Server, shard string, expected
|
|||
func TestMigrateServedTypes(t *testing.T) {
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
vp := NewVtctlPipe(t, ts)
|
||||
defer vp.Close()
|
||||
|
||||
|
|
|
@ -7,7 +7,6 @@ package testlib
|
|||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/sqltypes"
|
||||
|
@ -26,7 +25,7 @@ func TestPermissions(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
vp := NewVtctlPipe(t, ts)
|
||||
defer vp.Close()
|
||||
|
||||
|
|
|
@ -7,7 +7,6 @@ package testlib
|
|||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
|
||||
|
@ -24,7 +23,7 @@ import (
|
|||
func TestPlannedReparentShard(t *testing.T) {
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
vp := NewVtctlPipe(t, ts)
|
||||
defer vp.Close()
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ func TestTabletExternallyReparented(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
vp := NewVtctlPipe(t, ts)
|
||||
defer vp.Close()
|
||||
|
||||
|
@ -171,7 +171,7 @@ func TestTabletExternallyReparentedWithDifferentMysqlPort(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
|
||||
// Create an old master, a new master, two good slaves, one bad slave
|
||||
oldMaster := NewFakeTablet(t, wr, "cell1", 0, pb.TabletType_MASTER, db)
|
||||
|
@ -220,7 +220,7 @@ func TestTabletExternallyReparentedContinueOnUnexpectedMaster(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
|
||||
// Create an old master, a new master, two good slaves, one bad slave
|
||||
oldMaster := NewFakeTablet(t, wr, "cell1", 0, pb.TabletType_MASTER, db)
|
||||
|
@ -263,7 +263,7 @@ func TestTabletExternallyReparentedFailedOldMaster(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
|
||||
// Create an old master, a new master, and a good slave.
|
||||
oldMaster := NewFakeTablet(t, wr, "cell1", 0, pb.TabletType_MASTER, db)
|
||||
|
|
|
@ -7,7 +7,6 @@ package testlib
|
|||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
|
||||
|
@ -25,7 +24,7 @@ func TestShardReplicationStatuses(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
|
||||
// create shard and tablets
|
||||
if err := ts.CreateShard(ctx, "test_keyspace", "0"); err != nil {
|
||||
|
@ -94,7 +93,7 @@ func TestReparentTablet(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
|
||||
// create shard and tablets
|
||||
if err := ts.CreateShard(ctx, "test_keyspace", "0"); err != nil {
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
|
@ -52,7 +51,7 @@ func TestVersion(t *testing.T) {
|
|||
// Initialize our environment
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
vp := NewVtctlPipe(t, ts)
|
||||
defer vp.Close()
|
||||
|
||||
|
|
|
@ -80,7 +80,7 @@ func TestWaitForFilteredReplication_unhealthy(t *testing.T) {
|
|||
func waitForFilteredReplication(t *testing.T, expectedErr string, initialStats *pbq.RealtimeStats, broadcastStatsFunc func() *pbq.RealtimeStats) {
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
vp := NewVtctlPipe(t, ts)
|
||||
defer vp.Close()
|
||||
|
||||
|
|
|
@ -7,8 +7,6 @@
|
|||
package wrangler
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
|
@ -29,25 +27,17 @@ var (
|
|||
// Multiple go routines can use the same Wrangler at the same time,
|
||||
// provided they want to share the same logger / topo server / lock timeout.
|
||||
type Wrangler struct {
|
||||
logger logutil.Logger
|
||||
ts topo.Server
|
||||
tmc tmclient.TabletManagerClient
|
||||
lockTimeout time.Duration
|
||||
logger logutil.Logger
|
||||
ts topo.Server
|
||||
tmc tmclient.TabletManagerClient
|
||||
}
|
||||
|
||||
// New creates a new Wrangler object.
|
||||
//
|
||||
// lockTimeout: how long should we wait for the initial lock to start
|
||||
// a complex action? This is distinct from the context timeout because most
|
||||
// of the time, we want to immediately know that our action will
|
||||
// fail. However, automated action will need some time to arbitrate
|
||||
// the locks.
|
||||
func New(logger logutil.Logger, ts topo.Server, tmc tmclient.TabletManagerClient, lockTimeout time.Duration) *Wrangler {
|
||||
func New(logger logutil.Logger, ts topo.Server, tmc tmclient.TabletManagerClient) *Wrangler {
|
||||
return &Wrangler{
|
||||
logger: logger,
|
||||
ts: ts,
|
||||
tmc: tmc,
|
||||
lockTimeout: lockTimeout,
|
||||
logger: logger,
|
||||
ts: ts,
|
||||
tmc: tmc,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче