Adding 'checker' type, working on split_diff.

This commit is contained in:
Alain Jobart 2013-12-13 17:12:04 -08:00
Родитель e10a808765
Коммит 24fbf86be6
5 изменённых файлов: 86 добавлений и 36 удалений

Просмотреть файл

@ -59,7 +59,7 @@ func commandSplitDiff(wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []stri
log.Fatalf("command SplitDiff requires <keyspace/shard|zk shard path>") log.Fatalf("command SplitDiff requires <keyspace/shard|zk shard path>")
} }
keyspace, shard := shardParamToKeyspaceShard(subFlags.Arg(0)) keyspace, shard := shardParamToKeyspaceShard(subFlags.Arg(0))
return worker.NewSplitDiffWorker(wr, keyspace, shard) return worker.NewSplitDiffWorker(wr, *cell, keyspace, shard)
} }
func commandWorker(wr *wrangler.Wrangler, args []string) worker.Worker { func commandWorker(wr *wrangler.Wrangler, args []string) worker.Worker {

Просмотреть файл

@ -133,9 +133,12 @@ func initInteractiveMode(wr *wrangler.Wrangler) {
diffsTemplate := loadTemplate("diffs", diffsHTML) diffsTemplate := loadTemplate("diffs", diffsHTML)
splitDiffTemplate := loadTemplate("splitdiff", splitDiffHTML) splitDiffTemplate := loadTemplate("splitdiff", splitDiffHTML)
// toplevel menu
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
executeTemplate(w, indexTemplate, nil) executeTemplate(w, indexTemplate, nil)
}) })
// diffs menu and functions
http.HandleFunc("/diffs", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/diffs", func(w http.ResponseWriter, r *http.Request) {
executeTemplate(w, diffsTemplate, nil) executeTemplate(w, diffsTemplate, nil)
}) })
@ -148,8 +151,8 @@ func initInteractiveMode(wr *wrangler.Wrangler) {
shard := r.FormValue("shard") shard := r.FormValue("shard")
if keyspace == "" || shard == "" { if keyspace == "" || shard == "" {
// display the list of possible shards to chose from
result := make(map[string]interface{}) result := make(map[string]interface{})
shards, err := shardsWithSources(wr) shards, err := shardsWithSources(wr)
if err != nil { if err != nil {
result["Error"] = err.Error() result["Error"] = err.Error()
@ -159,7 +162,8 @@ func initInteractiveMode(wr *wrangler.Wrangler) {
executeTemplate(w, splitDiffTemplate, result) executeTemplate(w, splitDiffTemplate, result)
} else { } else {
wrk := worker.NewSplitDiffWorker(wr, keyspace, shard) // start the diff job
wrk := worker.NewSplitDiffWorker(wr, *cell, keyspace, shard)
if _, err := setAndStartWorker(wrk); err != nil { if _, err := setAndStartWorker(wrk); err != nil {
httpError(w, "cannot set worker: %s", err) httpError(w, "cannot set worker: %s", err)
return return

Просмотреть файл

@ -30,6 +30,7 @@ import (
) )
var ( var (
cell = flag.String("cell", "", "cell to pick servers from")
port = flag.Int("port", 8080, "port for the status / interactive mode") port = flag.Int("port", 8080, "port for the status / interactive mode")
) )

Просмотреть файл

@ -155,6 +155,10 @@ const (
// from a snapshot. idle -> restore -> spare // from a snapshot. idle -> restore -> spare
TYPE_RESTORE = TabletType("restore") TYPE_RESTORE = TabletType("restore")
// A tablet that is running a checker process. It is probably
// lagging in replication.
TYPE_CHECKER = TabletType("checker")
// a machine with data that needs to be wiped // a machine with data that needs to be wiped
TYPE_SCRAP = TabletType("scrap") TYPE_SCRAP = TabletType("scrap")
) )
@ -187,6 +191,7 @@ var SlaveTabletTypes = []TabletType{
TYPE_BACKUP, TYPE_BACKUP,
TYPE_SNAPSHOT_SOURCE, TYPE_SNAPSHOT_SOURCE,
TYPE_RESTORE, TYPE_RESTORE,
TYPE_CHECKER,
} }
// IsTypeInList returns true if the given type is in the list. // IsTypeInList returns true if the given type is in the list.
@ -218,9 +223,9 @@ func MakeStringTypeList(types []TabletType) []string {
// without changes to the replication graph // without changes to the replication graph
func IsTrivialTypeChange(oldTabletType, newTabletType TabletType) bool { func IsTrivialTypeChange(oldTabletType, newTabletType TabletType) bool {
switch oldTabletType { switch oldTabletType {
case TYPE_REPLICA, TYPE_RDONLY, TYPE_BATCH, TYPE_SPARE, TYPE_LAG, TYPE_LAG_ORPHAN, TYPE_BACKUP, TYPE_SNAPSHOT_SOURCE, TYPE_EXPERIMENTAL, TYPE_SCHEMA_UPGRADE: case TYPE_REPLICA, TYPE_RDONLY, TYPE_BATCH, TYPE_SPARE, TYPE_LAG, TYPE_LAG_ORPHAN, TYPE_BACKUP, TYPE_SNAPSHOT_SOURCE, TYPE_EXPERIMENTAL, TYPE_SCHEMA_UPGRADE, TYPE_CHECKER:
switch newTabletType { switch newTabletType {
case TYPE_REPLICA, TYPE_RDONLY, TYPE_BATCH, TYPE_SPARE, TYPE_LAG, TYPE_LAG_ORPHAN, TYPE_BACKUP, TYPE_SNAPSHOT_SOURCE, TYPE_EXPERIMENTAL, TYPE_SCHEMA_UPGRADE: case TYPE_REPLICA, TYPE_RDONLY, TYPE_BATCH, TYPE_SPARE, TYPE_LAG, TYPE_LAG_ORPHAN, TYPE_BACKUP, TYPE_SNAPSHOT_SOURCE, TYPE_EXPERIMENTAL, TYPE_SCHEMA_UPGRADE, TYPE_CHECKER:
return true return true
} }
case TYPE_SCRAP: case TYPE_SCRAP:
@ -276,10 +281,10 @@ func IsInReplicationGraph(tt TabletType) bool {
// and actively replicating? // and actively replicating?
// MASTER is not obviously (only support one level replication graph) // MASTER is not obviously (only support one level replication graph)
// IDLE and SCRAP are not either // IDLE and SCRAP are not either
// BACKUP, RESTORE, LAG_ORPHAN may or may not be, but we don't know for sure // BACKUP, RESTORE, LAG_ORPHAN, TYPE_CHECKER may or may not be, but we don't know for sure
func IsSlaveType(tt TabletType) bool { func IsSlaveType(tt TabletType) bool {
switch tt { switch tt {
case TYPE_MASTER, TYPE_IDLE, TYPE_SCRAP, TYPE_BACKUP, TYPE_RESTORE, TYPE_LAG_ORPHAN: case TYPE_MASTER, TYPE_IDLE, TYPE_SCRAP, TYPE_BACKUP, TYPE_RESTORE, TYPE_LAG_ORPHAN, TYPE_CHECKER:
return false return false
} }
return true return true

Просмотреть файл

@ -7,7 +7,6 @@ package worker
import ( import (
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/wrangler" "github.com/youtube/vitess/go/vt/wrangler"
@ -16,16 +15,18 @@ import (
const ( const (
// all the states for the worker // all the states for the worker
stateNotSarted = "not started" stateNotSarted = "not started"
stateInit = "initializing"
stateSleeping = "sleeping"
stateDone = "done" stateDone = "done"
stateError = "error" stateError = "error"
stateInit = "initializing"
stateFindTargets = "finding target instances"
) )
// SplitDiffWorker executes a diff between a destination shard and its // SplitDiffWorker executes a diff between a destination shard and its
// source shards in a shard split case. // source shards in a shard split case.
type SplitDiffWorker struct { type SplitDiffWorker struct {
wr *wrangler.Wrangler wr *wrangler.Wrangler
cell string
keyspace string keyspace string
shard string shard string
@ -36,17 +37,19 @@ type SplitDiffWorker struct {
// populated if state == stateError // populated if state == stateError
err error err error
// populated during stateInit // populated during stateInit, read-only after that
shardInfo *topo.ShardInfo shardInfo *topo.ShardInfo
// populated during sleep // populated during stateFindTargets, read-only after that
sleepTime int sourceAliases []topo.TabletAlias
destinationAlias topo.TabletAlias
} }
// NewSplitDiff returns a new SplitDiffWorker object. // NewSplitDiff returns a new SplitDiffWorker object.
func NewSplitDiffWorker(wr *wrangler.Wrangler, keyspace, shard string) Worker { func NewSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string) Worker {
return &SplitDiffWorker{ return &SplitDiffWorker{
wr: wr, wr: wr,
cell: cell,
keyspace: keyspace, keyspace: keyspace,
shard: shard, shard: shard,
@ -73,8 +76,6 @@ func (sdw *SplitDiffWorker) StatusAsHTML() string {
result := "<b>Working on:</b> " + sdw.keyspace + "/" + sdw.shard + "</br>\n" result := "<b>Working on:</b> " + sdw.keyspace + "/" + sdw.shard + "</br>\n"
result += "<b>State:</b> " + sdw.state + "</br>\n" result += "<b>State:</b> " + sdw.state + "</br>\n"
switch sdw.state { switch sdw.state {
case stateSleeping:
result += fmt.Sprintf("<b>Sleeping</b>: %v</br>\n", sdw.sleepTime)
case stateError: case stateError:
result += "<b>Error</b>: " + sdw.err.Error() + "</br>\n" result += "<b>Error</b>: " + sdw.err.Error() + "</br>\n"
} }
@ -88,8 +89,6 @@ func (sdw *SplitDiffWorker) StatusAsText() string {
result := "Working on: " + sdw.keyspace + "/" + sdw.shard + "\n" result := "Working on: " + sdw.keyspace + "/" + sdw.shard + "\n"
result += "State: " + sdw.state + "\n" result += "State: " + sdw.state + "\n"
switch sdw.state { switch sdw.state {
case stateSleeping:
result += fmt.Sprintf("Sleeping: %v\n", sdw.sleepTime)
case stateError: case stateError:
result += "Error: " + sdw.err.Error() + "\n" result += "Error: " + sdw.err.Error() + "\n"
} }
@ -108,45 +107,86 @@ func (sdw *SplitDiffWorker) CheckInterrupted() bool {
func (sdw *SplitDiffWorker) Run() { func (sdw *SplitDiffWorker) Run() {
// first state: read what we need to do // first state: read what we need to do
if err := sdw.Init(); err != nil { if err := sdw.init(); err != nil {
sdw.recordError(err) sdw.recordError(err)
return return
} }
if sdw.CheckInterrupted() { if sdw.CheckInterrupted() {
return return
} }
// second state: dummy sleep // second state: find targets
sdw.setState(stateSleeping) if err := sdw.findTargets(); err != nil {
for i := 0; i < 10; i++ { sdw.recordError(err)
sdw.mu.Lock() return
sdw.sleepTime = i }
sdw.mu.Unlock() if sdw.CheckInterrupted() {
time.Sleep(time.Second) return
if sdw.CheckInterrupted() {
return
}
} }
sdw.setState(stateDone) sdw.setState(stateDone)
} }
func (sdw *SplitDiffWorker) Init() error { // init phase:
// - read the shard info, make sure it has sources
func (sdw *SplitDiffWorker) init() error {
sdw.setState(stateInit) sdw.setState(stateInit)
shardInfo, err := sdw.wr.TopoServer().GetShard(sdw.keyspace, sdw.shard) var err error
sdw.shardInfo, err = sdw.wr.TopoServer().GetShard(sdw.keyspace, sdw.shard)
if err != nil { if err != nil {
return fmt.Errorf("Cannot read shard %v/%v: %v", sdw.keyspace, sdw.shard, err) return fmt.Errorf("Cannot read shard %v/%v: %v", sdw.keyspace, sdw.shard, err)
} }
if len(shardInfo.SourceShards) == 0 { if len(sdw.shardInfo.SourceShards) == 0 {
return fmt.Errorf("Shard %v/%v has no source shard", sdw.keyspace, sdw.shard) return fmt.Errorf("Shard %v/%v has no source shard", sdw.keyspace, sdw.shard)
} }
sdw.mu.Lock() return nil
sdw.shardInfo = shardInfo }
sdw.mu.Unlock()
// findTargets phase:
// - find one rdonly per source shard
// - find one rdonly in destination shard
// - mark them all as 'checker' pointing back to us
func (sdw *SplitDiffWorker) findTarget(shard string) (topo.TabletAlias, error) {
endPoints, err := sdw.wr.TopoServer().GetEndPoints(sdw.cell, sdw.keyspace, sdw.shard, topo.TYPE_RDONLY)
if err != nil {
return topo.TabletAlias{}, fmt.Errorf("GetEndPoints(%v,%v,%v,rdonly) failed: %v", sdw.cell, sdw.keyspace, sdw.shard, err)
}
if len(endPoints.Entries) == 0 {
return topo.TabletAlias{}, fmt.Errorf("No endpoint to chose from in (%v,%v/%v)", sdw.cell, sdw.keyspace, sdw.shard)
}
tabletAlias := topo.TabletAlias{
Cell: sdw.cell,
Uid: endPoints.Entries[0].Uid,
}
if err := sdw.wr.ChangeType(tabletAlias, topo.TYPE_CHECKER, false /*force*/); err != nil {
return topo.TabletAlias{}, err
}
return tabletAlias, nil
}
func (sdw *SplitDiffWorker) findTargets() error {
sdw.setState(stateFindTargets)
// find an appropriate endpoint in destination shard
var err error
sdw.destinationAlias, err = sdw.findTarget(sdw.shard)
if err != nil {
return err
}
// find an appropriate endpoint in the source shards
sdw.sourceAliases = make([]topo.TabletAlias, len(sdw.shardInfo.SourceShards))
for i, ss := range sdw.shardInfo.SourceShards {
sdw.sourceAliases[i], err = sdw.findTarget(ss.Shard)
if err != nil {
return err
}
}
return nil return nil
} }