Fixing the keyRanges for multirestore.

They happened to work for splits, wouldn't have worked for merges.
This commit is contained in:
Alain Jobart 2014-01-15 09:47:33 -08:00
Родитель 08ab756c19
Коммит a7a35961b8
3 изменённых файлов: 28 добавлений и 13 удалений

Просмотреть файл

@ -67,8 +67,8 @@ func multisnapshotCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []st
}
func multiRestoreCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) {
start := subFlags.String("start", "", "start of the key range")
end := subFlags.String("end", "", "end of the key range")
starts := subFlags.String("starts", "", "starts of the key range")
ends := subFlags.String("ends", "", "ends of the key range")
fetchRetryCount := subFlags.Int("fetch-retry-count", 3, "how many times to retry a failed transfer")
concurrency := subFlags.Int("concurrency", 8, "how many concurrent db inserts to run simultaneously")
fetchConcurrency := subFlags.Int("fetch-concurrency", 4, "how many files to fetch simultaneously")
@ -81,15 +81,25 @@ func multiRestoreCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []str
" writeBinLogs: write all operations to the binlogs")
subFlags.Parse(args)
keyRange, err := key.ParseKeyRangeParts(*start, *end)
if err != nil {
log.Fatalf("Invalid start or end: %v", err)
}
if subFlags.NArg() < 2 {
log.Fatalf("multirestore requires <destination_dbname> <source_host>[/<source_dbname>]... %v", args)
}
startArray := strings.Split(*starts, ",")
endArray := strings.Split(*ends, ",")
if len(startArray) != len(endArray) || len(startArray) != subFlags.NArg()-1 {
log.Fatalf("Need as many starts and ends as source URLs")
}
keyRanges := make([]key.KeyRange, len(startArray))
for i, s := range startArray {
var err error
keyRanges[i], err = key.ParseKeyRangeParts(s, endArray[i])
if err != nil {
log.Fatalf("Invalid start or end: %v", err)
}
}
dbName, dbis := subFlags.Arg(0), subFlags.Args()[1:]
sources := make([]*url.URL, len(dbis))
for i, dbi := range dbis {
@ -102,7 +112,7 @@ func multiRestoreCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []str
}
sources[i] = dbUrl
}
if err := mysqld.MultiRestore(dbName, keyRange, sources, *concurrency, *fetchConcurrency, *insertTableConcurrency, *fetchRetryCount, *strategy); err != nil {
if err := mysqld.MultiRestore(dbName, keyRanges, sources, *concurrency, *fetchConcurrency, *insertTableConcurrency, *fetchRetryCount, *strategy); err != nil {
log.Fatalf("multirestore failed: %v", err)
}
}
@ -225,7 +235,7 @@ var commands = []command{
"[-fetch-concurrency=3] [-fetch-retry-count=3] [-dont-wait-for-slave-start] <snapshot manifest file>",
"Restores a full snapshot"},
command{"multirestore", multiRestoreCmd,
"[-force] [-concurrency=3] [-fetch-concurrency=4] [-insert-table-concurrency=4] [-fetch-retry-count=3] [-start=''] [-end=''] [-strategy=] <destination_dbname> <source_host>[/<source_dbname>]...",
"[-force] [-concurrency=3] [-fetch-concurrency=4] [-insert-table-concurrency=4] [-fetch-retry-count=3] [-starts=start1,start2,...] [-ends=end1,end2,...] [-strategy=] <destination_dbname> <source_host>[/<source_dbname>]...",
"Restores a snapshot form multiple hosts"},
command{"multisnapshot", multisnapshotCmd, "[-concurrency=8] [-spec='-'] [-tables=''] [-skip-slave-restart] [-maximum-file-size=134217728] <db name> <key name>",
"Makes a complete snapshot using 'select * into' commands."},

Просмотреть файл

@ -740,7 +740,7 @@ func buildQueryList(destinationDbName, query string, writeBinLogs bool) []string
// also write to the binary logs.
// - If the strategy contains the command 'populateBlpCheckpoint' then we
// will populate the blp_checkpoint table with master positions to start from
func (mysqld *Mysqld) MultiRestore(destinationDbName string, keyRange key.KeyRange, sourceAddrs []*url.URL, snapshotConcurrency, fetchConcurrency, insertTableConcurrency, fetchRetryCount int, strategy string) (err error) {
func (mysqld *Mysqld) MultiRestore(destinationDbName string, keyRanges []key.KeyRange, sourceAddrs []*url.URL, snapshotConcurrency, fetchConcurrency, insertTableConcurrency, fetchRetryCount int, strategy string) (err error) {
writeBinLogs := strings.Contains(strategy, "writeBinLogs")
manifests := make([]*SplitSnapshotManifest, len(sourceAddrs))
@ -760,7 +760,7 @@ func (mysqld *Mysqld) MultiRestore(destinationDbName string, keyRange key.KeyRan
} else {
sourceDbName = sourceAddr.Path[1:]
}
ssm, e := fetchSnapshotManifestWithRetry("http://"+sourceAddr.Host, sourceDbName, keyRange, fetchRetryCount)
ssm, e := fetchSnapshotManifestWithRetry("http://"+sourceAddr.Host, sourceDbName, keyRanges[i], fetchRetryCount)
manifests[i] = ssm
rc.RecordError(e)
}(sourceAddr, i)

Просмотреть файл

@ -792,12 +792,17 @@ func (ta *TabletActor) multiRestore(actionNode *ActionNode) (err error) {
// get source tablets addresses
sourceAddrs := make([]*url.URL, len(args.SrcTabletAliases))
keyRanges := make([]key.KeyRange, len(args.SrcTabletAliases))
for i, alias := range args.SrcTabletAliases {
t, e := ta.ts.GetTablet(alias)
if e != nil {
return e
}
sourceAddrs[i] = &url.URL{Host: t.GetAddr(), Path: "/" + t.DbName()}
keyRanges[i], e = key.KeyRangesOverlap(tablet.KeyRange, t.KeyRange)
if e != nil {
return e
}
}
// change type to restore, no change to replication graph
@ -809,7 +814,7 @@ func (ta *TabletActor) multiRestore(actionNode *ActionNode) (err error) {
}
// run the action, scrap if it fails
if err := ta.mysqld.MultiRestore(tablet.DbName(), tablet.KeyRange, sourceAddrs, args.Concurrency, args.FetchConcurrency, args.InsertTableConcurrency, args.FetchRetryCount, args.Strategy); err != nil {
if err := ta.mysqld.MultiRestore(tablet.DbName(), keyRanges, sourceAddrs, args.Concurrency, args.FetchConcurrency, args.InsertTableConcurrency, args.FetchRetryCount, args.Strategy); err != nil {
if e := Scrap(ta.ts, ta.tabletAlias, false); e != nil {
log.Errorf("Failed to Scrap after failed RestoreFromMultiSnapshot: %v", e)
}