From 24fbf86be6d2e0fb93d8e315ce389434b4d0fe42 Mon Sep 17 00:00:00 2001 From: Alain Jobart Date: Fri, 13 Dec 2013 17:12:04 -0800 Subject: [PATCH] Adding 'checker' type, working on split_diff. --- go/cmd/vtworker/command.go | 2 +- go/cmd/vtworker/interactive.go | 8 ++- go/cmd/vtworker/vtworker.go | 1 + go/vt/topo/tablet.go | 13 +++-- go/vt/worker/split_diff.go | 98 ++++++++++++++++++++++++---------- 5 files changed, 86 insertions(+), 36 deletions(-) diff --git a/go/cmd/vtworker/command.go b/go/cmd/vtworker/command.go index f0f5d53ec2..fd802ef255 100644 --- a/go/cmd/vtworker/command.go +++ b/go/cmd/vtworker/command.go @@ -59,7 +59,7 @@ func commandSplitDiff(wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []stri log.Fatalf("command SplitDiff requires ") } keyspace, shard := shardParamToKeyspaceShard(subFlags.Arg(0)) - return worker.NewSplitDiffWorker(wr, keyspace, shard) + return worker.NewSplitDiffWorker(wr, *cell, keyspace, shard) } func commandWorker(wr *wrangler.Wrangler, args []string) worker.Worker { diff --git a/go/cmd/vtworker/interactive.go b/go/cmd/vtworker/interactive.go index a52b89da15..ea95ce0195 100644 --- a/go/cmd/vtworker/interactive.go +++ b/go/cmd/vtworker/interactive.go @@ -133,9 +133,12 @@ func initInteractiveMode(wr *wrangler.Wrangler) { diffsTemplate := loadTemplate("diffs", diffsHTML) splitDiffTemplate := loadTemplate("splitdiff", splitDiffHTML) + // toplevel menu http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { executeTemplate(w, indexTemplate, nil) }) + + // diffs menu and functions http.HandleFunc("/diffs", func(w http.ResponseWriter, r *http.Request) { executeTemplate(w, diffsTemplate, nil) }) @@ -148,8 +151,8 @@ func initInteractiveMode(wr *wrangler.Wrangler) { shard := r.FormValue("shard") if keyspace == "" || shard == "" { + // display the list of possible shards to chose from result := make(map[string]interface{}) - shards, err := shardsWithSources(wr) if err != nil { result["Error"] = err.Error() @@ -159,7 +162,8 @@ func initInteractiveMode(wr *wrangler.Wrangler) { executeTemplate(w, splitDiffTemplate, result) } else { - wrk := worker.NewSplitDiffWorker(wr, keyspace, shard) + // start the diff job + wrk := worker.NewSplitDiffWorker(wr, *cell, keyspace, shard) if _, err := setAndStartWorker(wrk); err != nil { httpError(w, "cannot set worker: %s", err) return diff --git a/go/cmd/vtworker/vtworker.go b/go/cmd/vtworker/vtworker.go index 5dc71c0649..9d4a269ea5 100644 --- a/go/cmd/vtworker/vtworker.go +++ b/go/cmd/vtworker/vtworker.go @@ -30,6 +30,7 @@ import ( ) var ( + cell = flag.String("cell", "", "cell to pick servers from") port = flag.Int("port", 8080, "port for the status / interactive mode") ) diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index bf73d78aa4..700a3e9200 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -155,6 +155,10 @@ const ( // from a snapshot. idle -> restore -> spare TYPE_RESTORE = TabletType("restore") + // A tablet that is running a checker process. It is probably + // lagging in replication. + TYPE_CHECKER = TabletType("checker") + // a machine with data that needs to be wiped TYPE_SCRAP = TabletType("scrap") ) @@ -187,6 +191,7 @@ var SlaveTabletTypes = []TabletType{ TYPE_BACKUP, TYPE_SNAPSHOT_SOURCE, TYPE_RESTORE, + TYPE_CHECKER, } // IsTypeInList returns true if the given type is in the list. @@ -218,9 +223,9 @@ func MakeStringTypeList(types []TabletType) []string { // without changes to the replication graph func IsTrivialTypeChange(oldTabletType, newTabletType TabletType) bool { switch oldTabletType { - case TYPE_REPLICA, TYPE_RDONLY, TYPE_BATCH, TYPE_SPARE, TYPE_LAG, TYPE_LAG_ORPHAN, TYPE_BACKUP, TYPE_SNAPSHOT_SOURCE, TYPE_EXPERIMENTAL, TYPE_SCHEMA_UPGRADE: + case TYPE_REPLICA, TYPE_RDONLY, TYPE_BATCH, TYPE_SPARE, TYPE_LAG, TYPE_LAG_ORPHAN, TYPE_BACKUP, TYPE_SNAPSHOT_SOURCE, TYPE_EXPERIMENTAL, TYPE_SCHEMA_UPGRADE, TYPE_CHECKER: switch newTabletType { - case TYPE_REPLICA, TYPE_RDONLY, TYPE_BATCH, TYPE_SPARE, TYPE_LAG, TYPE_LAG_ORPHAN, TYPE_BACKUP, TYPE_SNAPSHOT_SOURCE, TYPE_EXPERIMENTAL, TYPE_SCHEMA_UPGRADE: + case TYPE_REPLICA, TYPE_RDONLY, TYPE_BATCH, TYPE_SPARE, TYPE_LAG, TYPE_LAG_ORPHAN, TYPE_BACKUP, TYPE_SNAPSHOT_SOURCE, TYPE_EXPERIMENTAL, TYPE_SCHEMA_UPGRADE, TYPE_CHECKER: return true } case TYPE_SCRAP: @@ -276,10 +281,10 @@ func IsInReplicationGraph(tt TabletType) bool { // and actively replicating? // MASTER is not obviously (only support one level replication graph) // IDLE and SCRAP are not either -// BACKUP, RESTORE, LAG_ORPHAN may or may not be, but we don't know for sure +// BACKUP, RESTORE, LAG_ORPHAN, TYPE_CHECKER may or may not be, but we don't know for sure func IsSlaveType(tt TabletType) bool { switch tt { - case TYPE_MASTER, TYPE_IDLE, TYPE_SCRAP, TYPE_BACKUP, TYPE_RESTORE, TYPE_LAG_ORPHAN: + case TYPE_MASTER, TYPE_IDLE, TYPE_SCRAP, TYPE_BACKUP, TYPE_RESTORE, TYPE_LAG_ORPHAN, TYPE_CHECKER: return false } return true diff --git a/go/vt/worker/split_diff.go b/go/vt/worker/split_diff.go index 984b55b31b..13c8d98ee9 100644 --- a/go/vt/worker/split_diff.go +++ b/go/vt/worker/split_diff.go @@ -7,7 +7,6 @@ package worker import ( "fmt" "sync" - "time" "github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/wrangler" @@ -16,16 +15,18 @@ import ( const ( // all the states for the worker stateNotSarted = "not started" - stateInit = "initializing" - stateSleeping = "sleeping" stateDone = "done" stateError = "error" + + stateInit = "initializing" + stateFindTargets = "finding target instances" ) // SplitDiffWorker executes a diff between a destination shard and its // source shards in a shard split case. type SplitDiffWorker struct { wr *wrangler.Wrangler + cell string keyspace string shard string @@ -36,17 +37,19 @@ type SplitDiffWorker struct { // populated if state == stateError err error - // populated during stateInit + // populated during stateInit, read-only after that shardInfo *topo.ShardInfo - // populated during sleep - sleepTime int + // populated during stateFindTargets, read-only after that + sourceAliases []topo.TabletAlias + destinationAlias topo.TabletAlias } // NewSplitDiff returns a new SplitDiffWorker object. -func NewSplitDiffWorker(wr *wrangler.Wrangler, keyspace, shard string) Worker { +func NewSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string) Worker { return &SplitDiffWorker{ wr: wr, + cell: cell, keyspace: keyspace, shard: shard, @@ -73,8 +76,6 @@ func (sdw *SplitDiffWorker) StatusAsHTML() string { result := "Working on: " + sdw.keyspace + "/" + sdw.shard + "
\n" result += "State: " + sdw.state + "
\n" switch sdw.state { - case stateSleeping: - result += fmt.Sprintf("Sleeping: %v
\n", sdw.sleepTime) case stateError: result += "Error: " + sdw.err.Error() + "
\n" } @@ -88,8 +89,6 @@ func (sdw *SplitDiffWorker) StatusAsText() string { result := "Working on: " + sdw.keyspace + "/" + sdw.shard + "\n" result += "State: " + sdw.state + "\n" switch sdw.state { - case stateSleeping: - result += fmt.Sprintf("Sleeping: %v\n", sdw.sleepTime) case stateError: result += "Error: " + sdw.err.Error() + "\n" } @@ -108,45 +107,86 @@ func (sdw *SplitDiffWorker) CheckInterrupted() bool { func (sdw *SplitDiffWorker) Run() { // first state: read what we need to do - if err := sdw.Init(); err != nil { + if err := sdw.init(); err != nil { sdw.recordError(err) return } - if sdw.CheckInterrupted() { return } - // second state: dummy sleep - sdw.setState(stateSleeping) - for i := 0; i < 10; i++ { - sdw.mu.Lock() - sdw.sleepTime = i - sdw.mu.Unlock() - time.Sleep(time.Second) - if sdw.CheckInterrupted() { - return - } + // second state: find targets + if err := sdw.findTargets(); err != nil { + sdw.recordError(err) + return + } + if sdw.CheckInterrupted() { + return } sdw.setState(stateDone) } -func (sdw *SplitDiffWorker) Init() error { +// init phase: +// - read the shard info, make sure it has sources +func (sdw *SplitDiffWorker) init() error { sdw.setState(stateInit) - shardInfo, err := sdw.wr.TopoServer().GetShard(sdw.keyspace, sdw.shard) + var err error + sdw.shardInfo, err = sdw.wr.TopoServer().GetShard(sdw.keyspace, sdw.shard) if err != nil { return fmt.Errorf("Cannot read shard %v/%v: %v", sdw.keyspace, sdw.shard, err) } - if len(shardInfo.SourceShards) == 0 { + if len(sdw.shardInfo.SourceShards) == 0 { return fmt.Errorf("Shard %v/%v has no source shard", sdw.keyspace, sdw.shard) } - sdw.mu.Lock() - sdw.shardInfo = shardInfo - sdw.mu.Unlock() + return nil +} + +// findTargets phase: +// - find one rdonly per source shard +// - find one rdonly in destination shard +// - mark them all as 'checker' pointing back to us + +func (sdw *SplitDiffWorker) findTarget(shard string) (topo.TabletAlias, error) { + endPoints, err := sdw.wr.TopoServer().GetEndPoints(sdw.cell, sdw.keyspace, sdw.shard, topo.TYPE_RDONLY) + if err != nil { + return topo.TabletAlias{}, fmt.Errorf("GetEndPoints(%v,%v,%v,rdonly) failed: %v", sdw.cell, sdw.keyspace, sdw.shard, err) + } + if len(endPoints.Entries) == 0 { + return topo.TabletAlias{}, fmt.Errorf("No endpoint to chose from in (%v,%v/%v)", sdw.cell, sdw.keyspace, sdw.shard) + } + + tabletAlias := topo.TabletAlias{ + Cell: sdw.cell, + Uid: endPoints.Entries[0].Uid, + } + if err := sdw.wr.ChangeType(tabletAlias, topo.TYPE_CHECKER, false /*force*/); err != nil { + return topo.TabletAlias{}, err + } + return tabletAlias, nil +} + +func (sdw *SplitDiffWorker) findTargets() error { + sdw.setState(stateFindTargets) + + // find an appropriate endpoint in destination shard + var err error + sdw.destinationAlias, err = sdw.findTarget(sdw.shard) + if err != nil { + return err + } + + // find an appropriate endpoint in the source shards + sdw.sourceAliases = make([]topo.TabletAlias, len(sdw.shardInfo.SourceShards)) + for i, ss := range sdw.shardInfo.SourceShards { + sdw.sourceAliases[i], err = sdw.findTarget(ss.Shard) + if err != nil { + return err + } + } return nil }