Deleting old Snapshot/Restore/Clone code.

This commit is contained in:
Alain Jobart 2015-05-20 09:26:52 -07:00
Родитель e15f38cd39
Коммит 9b9f96abd8
18 изменённых файлов: 6 добавлений и 2259 удалений

Просмотреть файл

@ -42,26 +42,6 @@ func initCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) err
return nil
}
func restoreCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) error {
dontWaitForSlaveStart := subFlags.Bool("dont_wait_for_slave_start", false, "won't wait for replication to start (useful when restoring from master server)")
fetchConcurrency := subFlags.Int("fetch_concurrency", 3, "how many files to fetch simultaneously")
fetchRetryCount := subFlags.Int("fetch_retry_count", 3, "how many times to retry a failed transfer")
subFlags.Parse(args)
if subFlags.NArg() != 1 {
return fmt.Errorf("Command restore requires <snapshot manifest file>")
}
rs, err := mysqlctl.ReadSnapshotManifest(subFlags.Arg(0))
if err != nil {
return fmt.Errorf("restore failed: ReadSnapshotManifest: %v", err)
}
err = mysqld.RestoreFromSnapshot(logutil.NewConsoleLogger(), rs, *fetchConcurrency, *fetchRetryCount, *dontWaitForSlaveStart, nil)
if err != nil {
return fmt.Errorf("restore failed: RestoreFromSnapshot: %v", err)
}
return nil
}
func shutdownCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) error {
waitTime := subFlags.Duration("wait_time", mysqlctl.MysqlWaitTime, "how long to wait for shutdown")
subFlags.Parse(args)
@ -72,50 +52,6 @@ func shutdownCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string)
return nil
}
func snapshotCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) error {
concurrency := subFlags.Int("concurrency", 4, "how many compression jobs to run simultaneously")
subFlags.Parse(args)
if subFlags.NArg() != 1 {
return fmt.Errorf("Command snapshot requires <db name>")
}
filename, _, _, err := mysqld.CreateSnapshot(logutil.NewConsoleLogger(), subFlags.Arg(0), tabletAddr, false, *concurrency, false, nil)
if err != nil {
return fmt.Errorf("snapshot failed: %v", err)
}
log.Infof("manifest location: %v", filename)
return nil
}
func snapshotSourceStartCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) error {
concurrency := subFlags.Int("concurrency", 4, "how many checksum jobs to run simultaneously")
subFlags.Parse(args)
if subFlags.NArg() != 1 {
return fmt.Errorf("Command snapshotsourcestart requires <db name>")
}
filename, slaveStartRequired, readOnly, err := mysqld.CreateSnapshot(logutil.NewConsoleLogger(), subFlags.Arg(0), tabletAddr, false, *concurrency, true, nil)
if err != nil {
return fmt.Errorf("snapshot failed: %v", err)
}
log.Infof("manifest location: %v", filename)
log.Infof("slave start required: %v", slaveStartRequired)
log.Infof("read only: %v", readOnly)
return nil
}
func snapshotSourceEndCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) error {
slaveStartRequired := subFlags.Bool("slave_start", false, "will restart replication")
readWrite := subFlags.Bool("read_write", false, "will make the server read-write")
subFlags.Parse(args)
err := mysqld.SnapshotSourceEnd(*slaveStartRequired, !(*readWrite), true, map[string]string{})
if err != nil {
return fmt.Errorf("snapshotsourceend failed: %v", err)
}
return nil
}
func startCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) error {
waitTime := subFlags.Duration("wait_time", mysqlctl.MysqlWaitTime, "how long to wait for startup")
subFlags.Parse(args)
@ -188,19 +124,6 @@ var commands = []command{
command{"shutdown", shutdownCmd, "[-wait_time=20s]",
"Shuts down mysqld, does not remove any file"},
command{"snapshot", snapshotCmd,
"[-concurrency=4] <db name>",
"Takes a full snapshot, copying the innodb data files"},
command{"snapshotsourcestart", snapshotSourceStartCmd,
"[-concurrency=4] <db name>",
"Enters snapshot server mode (mysqld stopped, serving innodb data files)"},
command{"snapshotsourceend", snapshotSourceEndCmd,
"[-slave_start] [-read_write]",
"Gets out of snapshot server mode"},
command{"restore", restoreCmd,
"[-fetch_concurrency=3] [-fetch_retry_count=3] [-dont_wait_for_slave_start] <snapshot manifest file>",
"Restores a full snapshot"},
command{"position", positionCmd,
"<operation> <pos1> <pos2 | gtid>",
"Compute operations on replication positions"},

Просмотреть файл

@ -108,7 +108,6 @@ func main() {
exit.Return(1)
}
tabletmanager.HttpHandleSnapshots(mycnf, tabletAlias.Uid)
servenv.OnRun(func() {
addStatusParts(qsc)
})

Просмотреть файл

@ -39,6 +39,11 @@ const (
backupManifest = "MANIFEST"
)
const (
// slaveStartDeadline is the deadline for starting a slave
slaveStartDeadline = 30
)
var (
// ErrNoBackup is returned when there is no backup
ErrNoBackup = errors.New("no available backup")

Просмотреть файл

@ -1,474 +0,0 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mysqlctl
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/ioutil2"
"github.com/youtube/vitess/go/vt/hook"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
)
// These methods deal with cloning a running instance of mysql.
const (
maxLagSeconds = 5
)
const (
// slaveStartDeadline is the deadline for starting a slave
slaveStartDeadline = 30
)
const (
// SnapshotManifestFile is the file name for the snapshot manifest.
SnapshotManifestFile = "snapshot_manifest.json"
// SnapshotURLPath is the URL where to find the snapshot manifest.
SnapshotURLPath = "/snapshot"
)
// Validate that this instance is a reasonable source of data.
func (mysqld *Mysqld) validateCloneSource(serverMode bool, hookExtraEnv map[string]string) error {
// NOTE(msolomon) Removing this check for now - I don't see the value of validating this.
// // needs to be master, or slave that's not too far behind
// slaveStatus, err := mysqld.slaveStatus()
// if err != nil {
// if err != ErrNotSlave {
// return fmt.Errorf("mysqlctl: validateCloneSource failed, %v", err)
// }
// } else {
// lagSeconds, _ := strconv.Atoi(slaveStatus["seconds_behind_master"])
// if lagSeconds > maxLagSeconds {
// return fmt.Errorf("mysqlctl: validateCloneSource failed, lag_seconds exceed maximum tolerance (%v)", lagSeconds)
// }
// }
// make sure we can write locally
if err := mysqld.ValidateSnapshotPath(); err != nil {
return err
}
// run a hook to check local things
// FIXME(alainjobart) What other parameters do we have to
// provide? dbname, host, socket?
params := make([]string, 0, 1)
if serverMode {
params = append(params, "--server-mode")
}
h := hook.NewHook("preflight_snapshot", params)
h.ExtraEnv = hookExtraEnv
if err := h.ExecuteOptional(); err != nil {
return err
}
// FIXME(msolomon) check free space based on an estimate of the current
// size of the db files.
// Also, check that we aren't already cloning/compressing or acting as a
// source. Mysqld being down isn't enough, presumably that will be
// restarted as soon as the snapshot is taken.
return nil
}
// ValidateCloneTarget makes sure this mysql daemon is a valid target
// for a clone.
func (mysqld *Mysqld) ValidateCloneTarget(hookExtraEnv map[string]string) error {
// run a hook to check local things
h := hook.NewSimpleHook("preflight_restore")
h.ExtraEnv = hookExtraEnv
if err := h.ExecuteOptional(); err != nil {
return err
}
qr, err := mysqld.FetchSuperQuery("SHOW DATABASES")
if err != nil {
return fmt.Errorf("mysqlctl: ValidateCloneTarget failed, %v", err)
}
for _, row := range qr.Rows {
if strings.HasPrefix(row[0].String(), "vt_") {
dbName := row[0].String()
tableQr, err := mysqld.FetchSuperQuery("SHOW TABLES FROM " + dbName)
if err != nil {
return fmt.Errorf("mysqlctl: ValidateCloneTarget failed, %v", err)
} else if len(tableQr.Rows) == 0 {
// no tables == empty db, all is well
continue
}
return fmt.Errorf("mysqlctl: ValidateCloneTarget failed, found active db %v", dbName)
}
}
return nil
}
func findFilesToServe(srcDir, dstDir string, compress bool) ([]string, []string, error) {
fiList, err := ioutil.ReadDir(srcDir)
if err != nil {
return nil, nil, err
}
sources := make([]string, 0, len(fiList))
destinations := make([]string, 0, len(fiList))
for _, fi := range fiList {
if !fi.IsDir() {
srcPath := path.Join(srcDir, fi.Name())
var dstPath string
if compress {
dstPath = path.Join(dstDir, fi.Name()+".gz")
} else {
dstPath = path.Join(dstDir, fi.Name())
}
sources = append(sources, srcPath)
destinations = append(destinations, dstPath)
}
}
return sources, destinations, nil
}
func (mysqld *Mysqld) createSnapshot(logger logutil.Logger, concurrency int, serverMode bool) ([]SnapshotFile, error) {
sources := make([]string, 0, 128)
destinations := make([]string, 0, 128)
// clean out and start fresh
logger.Infof("removing previous snapshots: %v", mysqld.SnapshotDir)
if err := os.RemoveAll(mysqld.SnapshotDir); err != nil {
return nil, err
}
// FIXME(msolomon) innodb paths must match patterns in mycnf -
// probably belongs as a derived path.
type snapPair struct{ srcDir, dstDir string }
dps := []snapPair{
{mysqld.config.InnodbDataHomeDir, path.Join(mysqld.SnapshotDir, innodbDataSubdir)},
{mysqld.config.InnodbLogGroupHomeDir, path.Join(mysqld.SnapshotDir, innodbLogSubdir)},
}
dataDirEntries, err := ioutil.ReadDir(mysqld.config.DataDir)
if err != nil {
return nil, err
}
for _, de := range dataDirEntries {
dbDirPath := path.Join(mysqld.config.DataDir, de.Name())
// If this is not a directory, try to eval it as a syslink.
if !de.IsDir() {
dbDirPath, err = filepath.EvalSymlinks(dbDirPath)
if err != nil {
return nil, err
}
de, err = os.Stat(dbDirPath)
if err != nil {
return nil, err
}
}
if de.IsDir() {
// Copy anything that defines a db.opt file - that includes empty databases.
_, err := os.Stat(path.Join(dbDirPath, "db.opt"))
if err == nil {
dps = append(dps, snapPair{dbDirPath, path.Join(mysqld.SnapshotDir, dataDir, de.Name())})
} else {
// Look for at least one .frm file
dbDirEntries, err := ioutil.ReadDir(dbDirPath)
if err == nil {
for _, dbEntry := range dbDirEntries {
if strings.HasSuffix(dbEntry.Name(), ".frm") {
dps = append(dps, snapPair{dbDirPath, path.Join(mysqld.SnapshotDir, dataDir, de.Name())})
break
}
}
} else {
return nil, err
}
}
}
}
for _, dp := range dps {
if err := os.MkdirAll(dp.dstDir, 0775); err != nil {
return nil, err
}
if s, d, err := findFilesToServe(dp.srcDir, dp.dstDir, !serverMode); err != nil {
return nil, err
} else {
sources = append(sources, s...)
destinations = append(destinations, d...)
}
}
return newSnapshotFiles(sources, destinations, mysqld.SnapshotDir, concurrency, !serverMode)
}
// CreateSnapshot runs on the machine acting as the source for the clone.
//
// Check master/slave status and determine restore needs.
// If this instance is a slave, stop replication, otherwise place in read-only mode.
// Record replication position.
// Shutdown mysql
// Check paths for storing data
//
// Depending on the serverMode flag, we do the following:
// serverMode = false:
// Compress /vt/vt_[0-9a-f]+/data/vt_.+
// Compute hash (of compressed files, as we serve .gz files here)
// Place in /vt/clone_src where they will be served by http server (not rpc)
// Restart mysql
// serverMode = true:
// Make symlinks for /vt/vt_[0-9a-f]+/data/vt_.+ to innodb files
// Compute hash (of uncompressed files, as we serve uncompressed files)
// Place symlinks in /vt/clone_src where they will be served by http server
// Leave mysql stopped, return slaveStartRequired, readOnly
func (mysqld *Mysqld) CreateSnapshot(logger logutil.Logger, dbName, sourceAddr string, allowHierarchicalReplication bool, concurrency int, serverMode bool, hookExtraEnv map[string]string) (snapshotManifestURLPath string, slaveStartRequired, readOnly bool, err error) {
if dbName == "" {
return "", false, false, errors.New("CreateSnapshot failed: no database name provided")
}
if err = mysqld.validateCloneSource(serverMode, hookExtraEnv); err != nil {
return
}
// save initial state so we can restore on Start()
slaveStartRequired = false
sourceIsMaster := false
readOnly = true
slaveStatus, err := mysqld.SlaveStatus()
if err == nil {
slaveStartRequired = slaveStatus.SlaveRunning()
} else if err == ErrNotSlave {
sourceIsMaster = true
} else {
// If we can't get any data, just fail.
return
}
readOnly, err = mysqld.IsReadOnly()
if err != nil {
return
}
// Stop sources of writes so we can get a consistent replication position.
// If the source is a slave use the master replication position
// unless we are allowing hierarchical replicas.
masterAddr := ""
var replicationPosition proto.ReplicationPosition
if sourceIsMaster {
if err = mysqld.SetReadOnly(true); err != nil {
return
}
replicationPosition, err = mysqld.MasterPosition()
if err != nil {
return
}
masterAddr = mysqld.IPAddr()
} else {
if err = StopSlave(mysqld, hookExtraEnv); err != nil {
return
}
var slaveStatus proto.ReplicationStatus
slaveStatus, err = mysqld.SlaveStatus()
if err != nil {
return
}
replicationPosition = slaveStatus.Position
// We are a slave, check our replication strategy before
// choosing the master address.
if allowHierarchicalReplication {
masterAddr = mysqld.IPAddr()
} else {
masterAddr, err = mysqld.GetMasterAddr()
if err != nil {
return
}
}
}
if err = mysqld.Shutdown(true, MysqlWaitTime); err != nil {
return
}
var smFile string
dataFiles, snapshotErr := mysqld.createSnapshot(logger, concurrency, serverMode)
if snapshotErr != nil {
logger.Errorf("CreateSnapshot failed: %v", snapshotErr)
} else {
var sm *SnapshotManifest
sm, snapshotErr = newSnapshotManifest(sourceAddr, mysqld.IPAddr(),
masterAddr, dbName, dataFiles, replicationPosition, proto.ReplicationPosition{})
if snapshotErr != nil {
logger.Errorf("CreateSnapshot failed: %v", snapshotErr)
} else {
smFile = path.Join(mysqld.SnapshotDir, SnapshotManifestFile)
if snapshotErr = writeJSON(smFile, sm); snapshotErr != nil {
logger.Errorf("CreateSnapshot failed: %v", snapshotErr)
}
}
}
// restore our state if required
if serverMode && snapshotErr == nil {
logger.Infof("server mode snapshot worked, not restarting mysql")
} else {
if err = mysqld.SnapshotSourceEnd(slaveStartRequired, readOnly, false /*deleteSnapshot*/, hookExtraEnv); err != nil {
return
}
}
if snapshotErr != nil {
return "", slaveStartRequired, readOnly, snapshotErr
}
relative, err := filepath.Rel(mysqld.SnapshotDir, smFile)
if err != nil {
return "", slaveStartRequired, readOnly, nil
}
return path.Join(SnapshotURLPath, relative), slaveStartRequired, readOnly, nil
}
// SnapshotSourceEnd removes the current snapshot, and restarts mysqld.
func (mysqld *Mysqld) SnapshotSourceEnd(slaveStartRequired, readOnly, deleteSnapshot bool, hookExtraEnv map[string]string) error {
if deleteSnapshot {
// clean out our files
log.Infof("removing snapshot links: %v", mysqld.SnapshotDir)
if err := os.RemoveAll(mysqld.SnapshotDir); err != nil {
log.Warningf("failed to remove old snapshot: %v", err)
return err
}
}
// Try to restart mysqld
if err := mysqld.Start(MysqlWaitTime); err != nil {
return err
}
// Restore original mysqld state that we saved above.
if slaveStartRequired {
if err := StartSlave(mysqld, hookExtraEnv); err != nil {
return err
}
// this should be quick, but we might as well just wait
if err := WaitForSlaveStart(mysqld, slaveStartDeadline); err != nil {
return err
}
}
// And set read-only mode
if err := mysqld.SetReadOnly(readOnly); err != nil {
return err
}
return nil
}
func writeJSON(filename string, x interface{}) error {
data, err := json.MarshalIndent(x, " ", " ")
if err != nil {
return err
}
return ioutil2.WriteFileAtomic(filename, data, 0660)
}
// ReadSnapshotManifest reads and unpacks a SnapshotManifest
func ReadSnapshotManifest(filename string) (*SnapshotManifest, error) {
data, err := ioutil.ReadFile(filename)
if err != nil {
return nil, err
}
sm := new(SnapshotManifest)
if err = json.Unmarshal(data, sm); err != nil {
return nil, fmt.Errorf("ReadSnapshotManifest failed: %v %v", filename, err)
}
return sm, nil
}
// RestoreFromSnapshot runs on the presumably empty machine acting as
// the target in the create replica action.
//
// validate target (self)
// shutdown_mysql()
// create temp data directory /vt/target/vt_<keyspace>
// copy compressed data files via HTTP
// verify hash of compressed files
// uncompress into /vt/vt_<target-uid>/data/vt_<keyspace>
// start_mysql()
// clean up compressed files
func (mysqld *Mysqld) RestoreFromSnapshot(logger logutil.Logger, snapshotManifest *SnapshotManifest, fetchConcurrency, fetchRetryCount int, dontWaitForSlaveStart bool, hookExtraEnv map[string]string) error {
if snapshotManifest == nil {
return errors.New("RestoreFromSnapshot: nil snapshotManifest")
}
logger.Infof("ValidateCloneTarget")
if err := mysqld.ValidateCloneTarget(hookExtraEnv); err != nil {
return err
}
logger.Infof("Shutdown mysqld")
if err := mysqld.Shutdown(true, MysqlWaitTime); err != nil {
return err
}
logger.Infof("Fetch snapshot")
if err := mysqld.fetchSnapshot(snapshotManifest, fetchConcurrency, fetchRetryCount); err != nil {
return err
}
logger.Infof("Restart mysqld")
if err := mysqld.Start(MysqlWaitTime); err != nil {
return err
}
cmdList, err := mysqld.StartReplicationCommands(snapshotManifest.ReplicationStatus)
if err != nil {
return err
}
if err := mysqld.ExecuteSuperQueryList(cmdList); err != nil {
return err
}
if !dontWaitForSlaveStart {
if err := WaitForSlaveStart(mysqld, slaveStartDeadline); err != nil {
return err
}
}
h := hook.NewSimpleHook("postflight_restore")
h.ExtraEnv = hookExtraEnv
if err := h.ExecuteOptional(); err != nil {
return err
}
return nil
}
func (mysqld *Mysqld) fetchSnapshot(snapshotManifest *SnapshotManifest, fetchConcurrency, fetchRetryCount int) error {
replicaDbPath := path.Join(mysqld.config.DataDir, snapshotManifest.DbName)
cleanDirs := []string{mysqld.SnapshotDir, replicaDbPath,
mysqld.config.InnodbDataHomeDir, mysqld.config.InnodbLogGroupHomeDir}
// clean out and start fresh
// FIXME(msolomon) this might be changed to allow partial recovery, but at that point
// we are starting to reimplement rsync.
for _, dir := range cleanDirs {
if err := os.RemoveAll(dir); err != nil {
return err
}
if err := os.MkdirAll(dir, 0775); err != nil {
return err
}
}
return fetchFiles(snapshotManifest, mysqld.TabletDir, fetchConcurrency, fetchRetryCount)
}

Просмотреть файл

@ -5,25 +5,13 @@
package mysqlctl
import (
"bufio"
// "crypto/md5"
"encoding/hex"
"fmt"
"hash"
// "hash/crc64"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"path/filepath"
"sort"
"strings"
"sync"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/cgzip"
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
)
// Use this to simulate failures in tests
@ -75,452 +63,3 @@ func newHasher() *hasher {
func (h *hasher) HashString() string {
return hex.EncodeToString(h.Sum(nil))
}
// SnapshotFile describes a file to serve.
// 'Path' is the path component of the URL. SnapshotManifest.Addr is
// the host+port component of the URL.
// If path ends in '.gz', it is compressed.
// Size and Hash are computed on the Path itself
// if TableName is set, this file belongs to that table
type SnapshotFile struct {
Path string
Size int64
Hash string
TableName string
}
type SnapshotFiles []SnapshotFile
// sort.Interface
// we sort by descending file size
func (s SnapshotFiles) Len() int { return len(s) }
func (s SnapshotFiles) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s SnapshotFiles) Less(i, j int) bool { return s[i].Size > s[j].Size }
// This function returns the local file used to store the SnapshotFile,
// relative to the basePath.
// for instance, if the source path is something like:
// /vt/snapshot/vt_0000062344/data/vt_snapshot_test-MA,Mw/vt_insert_test.csv.gz
// we will get everything starting with 'data/...', append it to basepath,
// and remove the .gz extension. So with basePath=myPath, it will return:
// myPath/data/vt_snapshot_test-MA,Mw/vt_insert_test.csv
func (dataFile *SnapshotFile) getLocalFilename(basePath string) string {
filename := path.Join(basePath, dataFile.Path)
// trim compression extension if present
if strings.HasSuffix(filename, ".gz") {
filename = filename[:len(filename)-3]
}
return filename
}
// newSnapshotFile behavior depends on the compress flag:
// - if compress is true , it compresses a single file with gzip, and
// computes the hash on the compressed version.
// - if compress is false, just symlinks and computes the hash on the file
// The source file is always left intact.
// The path of the returned SnapshotFile will be relative
// to root.
func newSnapshotFile(srcPath, dstPath, root string, compress bool) (*SnapshotFile, error) {
// open the source file
srcFile, err := os.OpenFile(srcPath, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
defer srcFile.Close()
src := bufio.NewReaderSize(srcFile, 2*1024*1024)
var hash string
var size int64
if compress {
log.Infof("newSnapshotFile: starting to compress %v into %v", srcPath, dstPath)
// open the temporary destination file
dir, filePrefix := path.Split(dstPath)
dstFile, err := ioutil.TempFile(dir, filePrefix)
if err != nil {
return nil, err
}
defer func() {
// try to close and delete the file. in the
// success case, the file will already be
// closed and renamed, so all of this would
// fail anyway, no biggie
dstFile.Close()
os.Remove(dstFile.Name())
}()
dst := bufio.NewWriterSize(dstFile, 2*1024*1024)
// create the hasher and the tee on top
hasher := newHasher()
tee := io.MultiWriter(dst, hasher)
// create the gzip compression filter
gzip, err := cgzip.NewWriterLevel(tee, cgzip.Z_BEST_SPEED)
if err != nil {
return nil, err
}
// copy from the file to gzip to tee to output file and hasher
_, err = io.Copy(gzip, src)
if err != nil {
return nil, err
}
// close gzip to flush it
if err = gzip.Close(); err != nil {
return nil, err
}
// close dst manually to flush all buffers to disk
dst.Flush()
dstFile.Close()
hash = hasher.HashString()
// atomically move completed compressed file
err = os.Rename(dstFile.Name(), dstPath)
if err != nil {
return nil, err
}
// and get its size
fi, err := os.Stat(dstPath)
if err != nil {
return nil, err
}
size = fi.Size()
} else {
log.Infof("newSnapshotFile: starting to hash and symlinking %v to %v", srcPath, dstPath)
// get the hash
hasher := newHasher()
_, err = io.Copy(hasher, src)
if err != nil {
return nil, err
}
hash = hasher.HashString()
// do the symlink
err = os.Symlink(srcPath, dstPath)
if err != nil {
return nil, err
}
// and get the size
fi, err := os.Stat(srcPath)
if err != nil {
return nil, err
}
size = fi.Size()
}
log.Infof("clone data ready %v:%v", dstPath, hash)
relativeDst, err := filepath.Rel(root, dstPath)
if err != nil {
return nil, err
}
return &SnapshotFile{relativeDst, size, hash, ""}, nil
}
// newSnapshotFiles processes multiple files in parallel. The Paths of
// the returned SnapshotFiles will be relative to root.
// - if compress is true, we compress the files and compute the hash on
// the compressed version.
// - if compress is false, we symlink the files, and compute the hash on
// the original version.
func newSnapshotFiles(sources, destinations []string, root string, concurrency int, compress bool) ([]SnapshotFile, error) {
if len(sources) != len(destinations) || len(sources) == 0 {
return nil, fmt.Errorf("programming error: bad array lengths: %v %v", len(sources), len(destinations))
}
workQueue := make(chan int, len(sources))
for i := 0; i < len(sources); i++ {
workQueue <- i
}
close(workQueue)
snapshotFiles := make([]SnapshotFile, len(sources))
resultQueue := make(chan error, len(sources))
for i := 0; i < concurrency; i++ {
go func() {
for i := range workQueue {
sf, err := newSnapshotFile(sources[i], destinations[i], root, compress)
if err == nil {
snapshotFiles[i] = *sf
}
resultQueue <- err
}
}()
}
var err error
for i := 0; i < len(sources); i++ {
if compressErr := <-resultQueue; compressErr != nil {
err = compressErr
}
}
// clean up files if we had an error
// FIXME(alainjobart) it seems extreme to delete all files if
// the last one failed. Since we only move the file into
// its destination when it worked, we could assume if the file
// already exists it's good, and re-compute its hash.
if err != nil {
log.Infof("Error happened, deleting all the files we already compressed")
for _, dest := range destinations {
os.Remove(dest)
}
return nil, err
}
return snapshotFiles, nil
}
// a SnapshotManifest describes multiple SnapshotFiles and where
// to get them from.
type SnapshotManifest struct {
Addr string // this is the address of the tabletserver, not mysql
DbName string
Files SnapshotFiles
ReplicationStatus *proto.ReplicationStatus
MasterPosition proto.ReplicationPosition
}
func newSnapshotManifest(addr, mysqlAddr, masterAddr, dbName string, files []SnapshotFile, pos, masterPos proto.ReplicationPosition) (*SnapshotManifest, error) {
nrs, err := proto.NewReplicationStatus(masterAddr)
if err != nil {
return nil, err
}
rs := &SnapshotManifest{
Addr: addr,
DbName: dbName,
Files: files,
ReplicationStatus: nrs,
MasterPosition: masterPos,
}
sort.Sort(rs.Files)
rs.ReplicationStatus.Position = pos
return rs, nil
}
// fetchFile fetches data from the web server. It then sends it to a
// tee, which on one side has an hash checksum reader, and on the other
// a gunzip reader writing to a file. It will compare the hash
// checksum after the copy is done.
func fetchFile(srcUrl, srcHash, dstFilename string) error {
log.Infof("fetchFile: starting to fetch %v from %v", dstFilename, srcUrl)
// open the URL
req, err := http.NewRequest("GET", srcUrl, nil)
if err != nil {
return fmt.Errorf("NewRequest failed for %v: %v", srcUrl, err)
}
// we set the 'gzip' encoding ourselves so the library doesn't
// do it for us and ends up using go gzip (we want to use our own
// cgzip which is much faster)
req.Header.Set("Accept-Encoding", "gzip")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("failed fetching %v: %v", srcUrl, resp.Status)
}
defer resp.Body.Close()
// see if we need some uncompression
var reader io.Reader = resp.Body
ce := resp.Header.Get("Content-Encoding")
if ce != "" {
if ce == "gzip" {
gz, err := cgzip.NewReader(reader)
if err != nil {
return err
}
defer gz.Close()
reader = gz
} else {
return fmt.Errorf("unsupported Content-Encoding: %v", ce)
}
}
return uncompressAndCheck(reader, srcHash, dstFilename, strings.HasSuffix(srcUrl, ".gz"))
}
// uncompressAndCheck uses the provided reader to read data, and then
// sends it to a tee, which on one side has an hash checksum reader,
// and on the other a gunzip reader writing to a file. It will
// compare the hash checksum after the copy is done.
func uncompressAndCheck(reader io.Reader, srcHash, dstFilename string, needsUncompress bool) error {
// create destination directory
dir, filePrefix := path.Split(dstFilename)
if dirErr := os.MkdirAll(dir, 0775); dirErr != nil {
return dirErr
}
// create a temporary file to uncompress to
dstFile, err := ioutil.TempFile(dir, filePrefix)
if err != nil {
return err
}
defer func() {
// try to close and delete the file.
// in the success case, the file will already be closed
// and renamed, so all of this would fail anyway, no biggie
dstFile.Close()
os.Remove(dstFile.Name())
}()
// create a buffering output
dst := bufio.NewWriterSize(dstFile, 2*1024*1024)
// create hash to write the compressed data to
hasher := newHasher()
// create a Tee: we split the HTTP input into the hasher
// and into the gunziper
tee := io.TeeReader(reader, hasher)
// create the uncompresser
var decompressor io.Reader
if needsUncompress {
gz, err := cgzip.NewReader(tee)
if err != nil {
return err
}
defer gz.Close()
decompressor = gz
} else {
decompressor = tee
}
// see if we need to introduce failures
if simulateFailures {
failureCounter++
if failureCounter%10 == 0 {
return fmt.Errorf("Simulated error")
}
}
// copy the data. Will also write to the hasher
if _, err = io.Copy(dst, decompressor); err != nil {
return err
}
// check the hash
hash := hasher.HashString()
if srcHash != hash {
return fmt.Errorf("hash mismatch for %v, %v != %v", dstFilename, srcHash, hash)
}
// we're good
log.Infof("processed snapshot file: %v", dstFilename)
dst.Flush()
dstFile.Close()
// atomically move uncompressed file
if err := os.Chmod(dstFile.Name(), 0664); err != nil {
return err
}
return os.Rename(dstFile.Name(), dstFilename)
}
// fetchFileWithRetry fetches data from the web server, retrying a few
// times.
func fetchFileWithRetry(srcUrl, srcHash, dstFilename string, fetchRetryCount int) (err error) {
for i := 0; i < fetchRetryCount; i++ {
err = fetchFile(srcUrl, srcHash, dstFilename)
if err == nil {
return nil
}
log.Warningf("fetching snapshot file %v failed (try=%v): %v", dstFilename, i, err)
}
log.Errorf("fetching snapshot file %v failed too many times", dstFilename)
return err
}
// FIXME(msolomon) Should we add deadlines? What really matters more
// than a deadline is probably a sense of progress, more like a
// "progress timeout" - how long will we wait if there is no change in
// received bytes.
// FIXME(alainjobart) support fetching files in chunks: create a new
// struct fileChunk {
// snapshotFile *SnapshotFile
// relatedChunks []*fileChunk
// start,end uint64
// observedCrc32 uint32
// }
// Create a slice of fileChunk objects, populate it:
// For files smaller than <threshold>, create one fileChunk
// For files bigger than <threshold>, create N fileChunks
// (the first one has the list of all the others)
// Fetch them all:
// - change the workqueue to have indexes on the fileChunk slice
// - compute the crc32 while fetching, but don't compare right away
// Collect results the same way, write observedCrc32 in the fileChunk
// For each fileChunk, compare checksum:
// - if single file, compare snapshotFile.hash with observedCrc32
// - if multiple chunks and first chunk, merge observedCrc32, and compare
func fetchFiles(snapshotManifest *SnapshotManifest, destinationPath string, fetchConcurrency, fetchRetryCount int) (err error) {
// create a workQueue, a resultQueue, and the go routines
// to process entries out of workQueue into resultQueue
// the mutex protects the error response
workQueue := make(chan SnapshotFile, len(snapshotManifest.Files))
resultQueue := make(chan error, len(snapshotManifest.Files))
mutex := sync.Mutex{}
for i := 0; i < fetchConcurrency; i++ {
go func() {
for sf := range workQueue {
// if someone else errored out, we skip our job
mutex.Lock()
previousError := err
mutex.Unlock()
if previousError != nil {
resultQueue <- previousError
continue
}
// do our fetch, save the error
filename := sf.getLocalFilename(destinationPath)
furl := "http://" + snapshotManifest.Addr + path.Join(SnapshotURLPath, sf.Path)
fetchErr := fetchFileWithRetry(furl, sf.Hash, filename, fetchRetryCount)
if fetchErr != nil {
mutex.Lock()
err = fetchErr
mutex.Unlock()
}
resultQueue <- fetchErr
}
}()
}
// add the jobs (writing on the channel will block if the queue
// is full, no big deal)
jobCount := 0
for _, fi := range snapshotManifest.Files {
workQueue <- fi
jobCount++
}
close(workQueue)
// read the responses (we guarantee one response per job)
for i := 0; i < jobCount; i++ {
<-resultQueue
}
// clean up files if we had an error
// FIXME(alainjobart) it seems extreme to delete all files if
// the last one failed. Maybe we shouldn't, and if a file already
// exists, we hash it before retransmitting.
if err != nil {
log.Infof("Error happened, deleting all the files we already got")
for _, fi := range snapshotManifest.Files {
filename := fi.getLocalFilename(destinationPath)
os.Remove(filename)
}
}
return err
}

Просмотреть файл

@ -168,18 +168,6 @@ const (
// TabletActionBackup takes a db backup and stores it into BackupStorage
TabletActionBackup = "Backup"
// TabletActionSnapshot takes a db snapshot
TabletActionSnapshot = "Snapshot"
// TabletActionSnapshotSourceEnd restarts the mysql server
TabletActionSnapshotSourceEnd = "SnapshotSourceEnd"
// TabletActionReserveForRestore will prepare a server for restore
TabletActionReserveForRestore = "ReserveForRestore"
// TabletActionRestore will restore a backup
TabletActionRestore = "Restore"
//
// Shard actions - involve all tablets in a shard.
// These are just descriptive and used for locking / logging.

Просмотреть файл

@ -49,46 +49,6 @@ type SlaveWasRestartedArgs struct {
Parent topo.TabletAlias
}
// SnapshotArgs is the paylod for Snapshot
type SnapshotArgs struct {
Concurrency int
ServerMode bool
ForceMasterSnapshot bool
}
// SnapshotReply is the response for Snapshot
type SnapshotReply struct {
ParentAlias topo.TabletAlias
ManifestPath string
// these two are only used for ServerMode=true full snapshot
SlaveStartRequired bool
ReadOnly bool
}
// SnapshotSourceEndArgs is the payload for SnapshotSourceEnd
type SnapshotSourceEndArgs struct {
SlaveStartRequired bool
ReadOnly bool
OriginalType topo.TabletType
}
// ReserveForRestoreArgs is the payload for ReserveForRestore
type ReserveForRestoreArgs struct {
SrcTabletAlias topo.TabletAlias
}
// RestoreArgs is the payload for Restore
type RestoreArgs struct {
SrcTabletAlias topo.TabletAlias
SrcFilePath string
ParentAlias topo.TabletAlias
FetchConcurrency int
FetchRetryCount int
WasReserved bool
DontWaitForSlaveStart bool
}
// shard action node structures
// ApplySchemaShardArgs is the payload for ApplySchemaShard

Просмотреть файл

@ -5,19 +5,12 @@
package tabletmanager
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"path"
"strings"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/mysql/proto"
blproto "github.com/youtube/vitess/go/vt/binlog/proto"
"github.com/youtube/vitess/go/vt/hook"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/mysqlctl"
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
@ -124,14 +117,6 @@ type RPCAgent interface {
Backup(ctx context.Context, concurrency int, logger logutil.Logger) error
Snapshot(ctx context.Context, args *actionnode.SnapshotArgs, logger logutil.Logger) (*actionnode.SnapshotReply, error)
SnapshotSourceEnd(ctx context.Context, args *actionnode.SnapshotSourceEndArgs) error
ReserveForRestore(ctx context.Context, args *actionnode.ReserveForRestoreArgs) error
Restore(ctx context.Context, args *actionnode.RestoreArgs, logger logutil.Logger) error
// RPC helpers
RPCWrap(ctx context.Context, name string, args, reply interface{}, f func() error) error
RPCWrapLock(ctx context.Context, name string, args, reply interface{}, verbose bool, f func() error) error
@ -763,270 +748,3 @@ func (agent *ActionAgent) Backup(ctx context.Context, concurrency int, logger lo
return returnErr
}
// Snapshot takes a db snapshot
// Should be called under RPCWrapLockAction.
func (agent *ActionAgent) Snapshot(ctx context.Context, args *actionnode.SnapshotArgs, logger logutil.Logger) (*actionnode.SnapshotReply, error) {
// update our type to TYPE_BACKUP
tablet, err := agent.TopoServer.GetTablet(agent.TabletAlias)
if err != nil {
return nil, err
}
originalType := tablet.Type
// ForceMasterSnapshot: Normally a master is not a viable tablet
// to snapshot. However, there are degenerate cases where you need
// to override this, for instance the initial clone of a new master.
if tablet.Type == topo.TYPE_MASTER && args.ForceMasterSnapshot {
// In this case, we don't bother recomputing the serving graph.
// All queries will have to fail anyway.
log.Infof("force change type master -> backup")
// There is a legitimate reason to force in the case of a single
// master.
tablet.Tablet.Type = topo.TYPE_BACKUP
err = topo.UpdateTablet(ctx, agent.TopoServer, tablet)
} else {
err = topotools.ChangeType(ctx, agent.TopoServer, tablet.Alias, topo.TYPE_BACKUP, make(map[string]string))
}
if err != nil {
return nil, err
}
// let's update our internal state (stop query service and other things)
if err := agent.refreshTablet(ctx, "snapshotStart"); err != nil {
return nil, fmt.Errorf("failed to update state before snaphost: %v", err)
}
// create the loggers: tee to console and source
l := logutil.NewTeeLogger(logutil.NewConsoleLogger(), logger)
// now we can run the backup
filename, slaveStartRequired, readOnly, returnErr := agent.Mysqld.CreateSnapshot(l, tablet.DbName(), tablet.Addr(), false, args.Concurrency, args.ServerMode, agent.hookExtraEnv())
// and change our type to the appropriate value
newType := originalType
if returnErr != nil {
log.Errorf("snapshot failed, restoring tablet type back to %v: %v", newType, returnErr)
} else {
if args.ServerMode {
log.Infof("server mode specified, switching tablet to snapshot_source mode")
newType = topo.TYPE_SNAPSHOT_SOURCE
} else {
log.Infof("change type back after snapshot: %v", newType)
}
}
if originalType == topo.TYPE_MASTER && args.ForceMasterSnapshot && newType != topo.TYPE_SNAPSHOT_SOURCE {
log.Infof("force change type backup -> master: %v", tablet.Alias)
tablet.Tablet.Type = topo.TYPE_MASTER
err = topo.UpdateTablet(ctx, agent.TopoServer, tablet)
} else {
err = topotools.ChangeType(ctx, agent.TopoServer, tablet.Alias, newType, nil)
}
if err != nil {
// failure in changing the topology type is probably worse,
// so returning that (we logged the snapshot error anyway)
returnErr = err
}
// if anything failed, don't return anything
if returnErr != nil {
return nil, returnErr
}
// it all worked, return the required information
sr := &actionnode.SnapshotReply{
ManifestPath: filename,
SlaveStartRequired: slaveStartRequired,
ReadOnly: readOnly,
}
if tablet.Type == topo.TYPE_MASTER {
// If this is a master, this will be the new parent.
sr.ParentAlias = tablet.Alias
} else {
// Otherwise get the master from the shard record
si, err := agent.TopoServer.GetShard(tablet.Keyspace, tablet.Shard)
if err != nil {
return nil, err
}
sr.ParentAlias = si.MasterAlias
}
return sr, nil
}
// SnapshotSourceEnd restores the state of the server after a
// Snapshot(server_mode =true)
// Should be called under RPCWrapLockAction.
func (agent *ActionAgent) SnapshotSourceEnd(ctx context.Context, args *actionnode.SnapshotSourceEndArgs) error {
tablet, err := agent.TopoServer.GetTablet(agent.TabletAlias)
if err != nil {
return err
}
if tablet.Type != topo.TYPE_SNAPSHOT_SOURCE {
return fmt.Errorf("expected snapshot_source type, not %v", tablet.Type)
}
if err := agent.Mysqld.SnapshotSourceEnd(args.SlaveStartRequired, args.ReadOnly, true, agent.hookExtraEnv()); err != nil {
log.Errorf("SnapshotSourceEnd failed, leaving tablet type alone: %v", err)
return err
}
// change the type back
if args.OriginalType == topo.TYPE_MASTER {
// force the master update
tablet.Tablet.Type = topo.TYPE_MASTER
err = topo.UpdateTablet(ctx, agent.TopoServer, tablet)
} else {
err = topotools.ChangeType(ctx, agent.TopoServer, tablet.Alias, args.OriginalType, make(map[string]string))
}
return err
}
// change a tablet type to RESTORE and set all the other arguments.
// from now on, we can go to:
// - back to IDLE if we don't use the tablet at all (after for instance
// a successful ReserveForRestore but a failed Snapshot)
// - to SCRAP if something in the process on the target host fails
// - to SPARE if the clone works
func (agent *ActionAgent) changeTypeToRestore(ctx context.Context, tablet, sourceTablet *topo.TabletInfo, keyRange key.KeyRange) error {
// run the optional preflight_assigned hook
hk := hook.NewSimpleHook("preflight_assigned")
topotools.ConfigureTabletHook(hk, agent.TabletAlias)
if err := hk.ExecuteOptional(); err != nil {
return err
}
// change the type
tablet.Keyspace = sourceTablet.Keyspace
tablet.Shard = sourceTablet.Shard
tablet.Type = topo.TYPE_RESTORE
tablet.KeyRange = keyRange
tablet.DbNameOverride = sourceTablet.DbNameOverride
if err := topo.UpdateTablet(ctx, agent.TopoServer, tablet); err != nil {
return err
}
// and create the replication graph items
return topo.UpdateTabletReplicationData(ctx, agent.TopoServer, tablet.Tablet)
}
// ReserveForRestore reserves the current tablet for an upcoming
// restore operation.
// Should be called under RPCWrapLockAction.
func (agent *ActionAgent) ReserveForRestore(ctx context.Context, args *actionnode.ReserveForRestoreArgs) error {
// first check mysql, no need to go further if we can't restore
if err := agent.Mysqld.ValidateCloneTarget(agent.hookExtraEnv()); err != nil {
return err
}
// read our current tablet, verify its state
tablet, err := agent.TopoServer.GetTablet(agent.TabletAlias)
if err != nil {
return err
}
if tablet.Type != topo.TYPE_IDLE {
return fmt.Errorf("expected idle type, not %v", tablet.Type)
}
// read the source tablet
sourceTablet, err := agent.TopoServer.GetTablet(args.SrcTabletAlias)
if err != nil {
return err
}
return agent.changeTypeToRestore(ctx, tablet, sourceTablet, sourceTablet.KeyRange)
}
func fetchAndParseJSONFile(addr, filename string, result interface{}) error {
// read the manifest
murl := "http://" + addr + filename
resp, err := http.Get(murl)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Error fetching url %v: %v", murl, resp.Status)
}
data, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return err
}
// unpack it
return json.Unmarshal(data, result)
}
// Restore stops the tablet's mysqld, replaces its data folder with a snapshot,
// and then restarts it.
//
// Check that the SnapshotManifest is valid and the master has not changed.
// Shutdown mysqld.
// Load the snapshot from source tablet.
// Restart mysqld and replication.
// Put tablet into the replication graph as a spare.
// Should be called under RPCWrapLockAction.
func (agent *ActionAgent) Restore(ctx context.Context, args *actionnode.RestoreArgs, logger logutil.Logger) error {
// read our current tablet, verify its state
tablet, err := agent.TopoServer.GetTablet(agent.TabletAlias)
if err != nil {
return err
}
if args.WasReserved {
if tablet.Type != topo.TYPE_RESTORE {
return fmt.Errorf("expected restore type, not %v", tablet.Type)
}
} else {
if tablet.Type != topo.TYPE_IDLE {
return fmt.Errorf("expected idle type, not %v", tablet.Type)
}
}
// read the source tablet, compute args.SrcFilePath if default
sourceTablet, err := agent.TopoServer.GetTablet(args.SrcTabletAlias)
if err != nil {
return err
}
if strings.ToLower(args.SrcFilePath) == "default" {
args.SrcFilePath = path.Join(mysqlctl.SnapshotURLPath, mysqlctl.SnapshotManifestFile)
}
// read the parent tablet, verify its state
parentTablet, err := agent.TopoServer.GetTablet(args.ParentAlias)
if err != nil {
return err
}
if parentTablet.Type != topo.TYPE_MASTER && parentTablet.Type != topo.TYPE_SNAPSHOT_SOURCE {
return fmt.Errorf("restore expected master or snapshot_source parent: %v %v", parentTablet.Type, args.ParentAlias)
}
// read & unpack the manifest
sm := new(mysqlctl.SnapshotManifest)
if err := fetchAndParseJSONFile(sourceTablet.Addr(), args.SrcFilePath, sm); err != nil {
return err
}
if !args.WasReserved {
if err := agent.changeTypeToRestore(ctx, tablet, sourceTablet, sourceTablet.KeyRange); err != nil {
return err
}
}
// create the loggers: tee to console and source
l := logutil.NewTeeLogger(logutil.NewConsoleLogger(), logger)
// do the work
if err := agent.Mysqld.RestoreFromSnapshot(l, sm, args.FetchConcurrency, args.FetchRetryCount, args.DontWaitForSlaveStart, agent.hookExtraEnv()); err != nil {
log.Errorf("RestoreFromSnapshot failed (%v), scrapping", err)
if err := topotools.Scrap(ctx, agent.TopoServer, agent.TabletAlias, false); err != nil {
log.Errorf("Failed to Scrap after failed RestoreFromSnapshot: %v", err)
}
return err
}
// reload the schema
agent.ReloadSchema(ctx)
// change to TYPE_SPARE, we're done!
return topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topo.TYPE_SPARE, nil)
}

Просмотреть файл

@ -1162,154 +1162,6 @@ func agentRPCTestBackupPanic(ctx context.Context, t *testing.T, client tmclient.
expectRPCWrapLockActionPanic(t, err)
}
var testSnapshotArgs = &actionnode.SnapshotArgs{
Concurrency: 42,
ServerMode: true,
ForceMasterSnapshot: true,
}
var testSnapshotReply = &actionnode.SnapshotReply{
ParentAlias: topo.TabletAlias{
Cell: "test",
Uid: 456,
},
ManifestPath: "path",
SlaveStartRequired: true,
ReadOnly: true,
}
func (fra *fakeRPCAgent) Snapshot(ctx context.Context, args *actionnode.SnapshotArgs, logger logutil.Logger) (*actionnode.SnapshotReply, error) {
if fra.panics {
panic(fmt.Errorf("test-triggered panic"))
}
compare(fra.t, "Snapshot args", args, testSnapshotArgs)
logStuff(logger, 0)
return testSnapshotReply, nil
}
func agentRPCTestSnapshot(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
logChannel, errFunc, err := client.Snapshot(ctx, ti, testSnapshotArgs)
if err != nil {
t.Fatalf("Snapshot failed: %v", err)
}
compareLoggedStuff(t, "Snapshot", logChannel, 0)
sr, err := errFunc()
compareError(t, "Snapshot", err, sr, testSnapshotReply)
}
func agentRPCTestSnapshotPanic(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
logChannel, errFunc, err := client.Snapshot(ctx, ti, testSnapshotArgs)
if err != nil {
t.Fatalf("Snapshot failed: %v", err)
}
if e, ok := <-logChannel; ok {
t.Fatalf("Unexpected Snapshot logs: %v", e)
}
_, err = errFunc()
expectRPCWrapLockActionPanic(t, err)
}
var testSnapshotSourceEndArgs = &actionnode.SnapshotSourceEndArgs{
SlaveStartRequired: true,
ReadOnly: true,
OriginalType: topo.TYPE_RDONLY,
}
var testSnapshotSourceEndCalled = false
func (fra *fakeRPCAgent) SnapshotSourceEnd(ctx context.Context, args *actionnode.SnapshotSourceEndArgs) error {
if fra.panics {
panic(fmt.Errorf("test-triggered panic"))
}
compare(fra.t, "SnapshotSourceEnd args", args, testSnapshotSourceEndArgs)
testSnapshotSourceEndCalled = true
return nil
}
func agentRPCTestSnapshotSourceEnd(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
err := client.SnapshotSourceEnd(ctx, ti, testSnapshotSourceEndArgs)
compareError(t, "SnapshotSourceEnd", err, true, testSnapshotSourceEndCalled)
}
func agentRPCTestSnapshotSourceEndPanic(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
err := client.SnapshotSourceEnd(ctx, ti, testSnapshotSourceEndArgs)
expectRPCWrapLockActionPanic(t, err)
}
var testReserveForRestoreArgs = &actionnode.ReserveForRestoreArgs{
SrcTabletAlias: topo.TabletAlias{
Cell: "test",
Uid: 456,
},
}
var testReserveForRestoreCalled = false
func (fra *fakeRPCAgent) ReserveForRestore(ctx context.Context, args *actionnode.ReserveForRestoreArgs) error {
if fra.panics {
panic(fmt.Errorf("test-triggered panic"))
}
compare(fra.t, "ReserveForRestore args", args, testReserveForRestoreArgs)
testReserveForRestoreCalled = true
return nil
}
func agentRPCTestReserveForRestore(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
err := client.ReserveForRestore(ctx, ti, testReserveForRestoreArgs)
compareError(t, "ReserveForRestore", err, true, testReserveForRestoreCalled)
}
func agentRPCTestReserveForRestorePanic(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
err := client.ReserveForRestore(ctx, ti, testReserveForRestoreArgs)
expectRPCWrapLockActionPanic(t, err)
}
var testRestoreArgs = &actionnode.RestoreArgs{
SrcTabletAlias: topo.TabletAlias{
Cell: "jail1",
Uid: 890,
},
SrcFilePath: "source",
ParentAlias: topo.TabletAlias{
Cell: "jail2",
Uid: 901,
},
FetchConcurrency: 12,
FetchRetryCount: 678,
WasReserved: true,
DontWaitForSlaveStart: true,
}
var testRestoreCalled = false
func (fra *fakeRPCAgent) Restore(ctx context.Context, args *actionnode.RestoreArgs, logger logutil.Logger) error {
if fra.panics {
panic(fmt.Errorf("test-triggered panic"))
}
compare(fra.t, "Restore args", args, testRestoreArgs)
logStuff(logger, 10)
testRestoreCalled = true
return nil
}
func agentRPCTestRestore(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
logChannel, errFunc, err := client.Restore(ctx, ti, testRestoreArgs)
if err != nil {
t.Fatalf("Restore failed: %v", err)
}
compareLoggedStuff(t, "Restore", logChannel, 10)
err = errFunc()
compareError(t, "Restore", err, true, testRestoreCalled)
}
func agentRPCTestRestorePanic(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
logChannel, errFunc, err := client.Restore(ctx, ti, testRestoreArgs)
if err != nil {
t.Fatalf("Snapshot failed: %v", err)
}
if e, ok := <-logChannel; ok {
t.Fatalf("Unexpected Snapshot logs: %v", e)
}
err = errFunc()
expectRPCWrapLockActionPanic(t, err)
}
//
// RPC helpers
//
@ -1400,10 +1252,6 @@ func Run(t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo,
// Backup / restore related methods
agentRPCTestBackup(ctx, t, client, ti)
agentRPCTestSnapshot(ctx, t, client, ti)
agentRPCTestSnapshotSourceEnd(ctx, t, client, ti)
agentRPCTestReserveForRestore(ctx, t, client, ti)
agentRPCTestRestore(ctx, t, client, ti)
//
// Tests panic handling everywhere now
@ -1457,8 +1305,4 @@ func Run(t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo,
// Backup / restore related methods
agentRPCTestBackupPanic(ctx, t, client, ti)
agentRPCTestSnapshotPanic(ctx, t, client, ti)
agentRPCTestSnapshotSourceEndPanic(ctx, t, client, ti)
agentRPCTestReserveForRestorePanic(ctx, t, client, ti)
agentRPCTestRestorePanic(ctx, t, client, ti)
}

Просмотреть файл

@ -302,32 +302,6 @@ func (client *FakeTabletManagerClient) Backup(ctx context.Context, tablet *topo.
}, nil
}
// Snapshot is part of the tmclient.TabletManagerClient interface
func (client *FakeTabletManagerClient) Snapshot(ctx context.Context, tablet *topo.TabletInfo, sa *actionnode.SnapshotArgs) (<-chan *logutil.LoggerEvent, tmclient.SnapshotReplyFunc, error) {
logstream := make(chan *logutil.LoggerEvent, 10)
return logstream, func() (*actionnode.SnapshotReply, error) {
return &actionnode.SnapshotReply{}, nil
}, nil
}
// SnapshotSourceEnd is part of the tmclient.TabletManagerClient interface
func (client *FakeTabletManagerClient) SnapshotSourceEnd(ctx context.Context, tablet *topo.TabletInfo, args *actionnode.SnapshotSourceEndArgs) error {
return nil
}
// ReserveForRestore is part of the tmclient.TabletManagerClient interface
func (client *FakeTabletManagerClient) ReserveForRestore(ctx context.Context, tablet *topo.TabletInfo, args *actionnode.ReserveForRestoreArgs) error {
return nil
}
// Restore is part of the tmclient.TabletManagerClient interface
func (client *FakeTabletManagerClient) Restore(ctx context.Context, tablet *topo.TabletInfo, sa *actionnode.RestoreArgs) (<-chan *logutil.LoggerEvent, tmclient.ErrFunc, error) {
logstream := make(chan *logutil.LoggerEvent, 10)
return logstream, func() error {
return nil
}, nil
}
//
// RPC related methods
//

Просмотреть файл

@ -8,9 +8,7 @@ import (
"time"
blproto "github.com/youtube/vitess/go/vt/binlog/proto"
"github.com/youtube/vitess/go/vt/logutil"
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/topo"
)
@ -99,15 +97,6 @@ type BackupArgs struct {
Concurrency int
}
// gorpc doesn't support returning a streaming type during streaming
// and a final return value, so using structures with either one set.
// SnapshotStreamingReply has the two possible replies for Snapshot
type SnapshotStreamingReply struct {
Log *logutil.LoggerEvent
Result *actionnode.SnapshotReply
}
// TabletExternallyReparentedArgs has arguments for TabletExternallyReparented
type TabletExternallyReparentedArgs struct {
ExternalID string

Просмотреть файл

@ -506,117 +506,6 @@ func (client *GoRPCTabletManagerClient) Backup(ctx context.Context, tablet *topo
}, nil
}
// Snapshot is part of the tmclient.TabletManagerClient interface
func (client *GoRPCTabletManagerClient) Snapshot(ctx context.Context, tablet *topo.TabletInfo, sa *actionnode.SnapshotArgs) (<-chan *logutil.LoggerEvent, tmclient.SnapshotReplyFunc, error) {
var connectTimeout time.Duration
deadline, ok := ctx.Deadline()
if ok {
connectTimeout = deadline.Sub(time.Now())
if connectTimeout < 0 {
return nil, nil, timeoutError{fmt.Errorf("timeout connecting to TabletManager.Snapshot on %v", tablet.Alias)}
}
}
rpcClient, err := bsonrpc.DialHTTP("tcp", tablet.Addr(), connectTimeout, nil)
if err != nil {
return nil, nil, err
}
logstream := make(chan *logutil.LoggerEvent, 10)
rpcstream := make(chan *gorpcproto.SnapshotStreamingReply, 10)
result := &actionnode.SnapshotReply{}
c := rpcClient.StreamGo("TabletManager.Snapshot", sa, rpcstream)
interrupted := false
go func() {
for {
select {
case <-ctx.Done():
// context is done
interrupted = true
close(logstream)
rpcClient.Close()
return
case ssr, ok := <-rpcstream:
if !ok {
close(logstream)
rpcClient.Close()
return
}
if ssr.Log != nil {
logstream <- ssr.Log
}
if ssr.Result != nil {
*result = *ssr.Result
}
}
}
}()
return logstream, func() (*actionnode.SnapshotReply, error) {
// this is only called after streaming is done
if interrupted {
return nil, fmt.Errorf("TabletManager.Snapshot interrupted by context")
}
return result, c.Error
}, nil
}
// SnapshotSourceEnd is part of the tmclient.TabletManagerClient interface
func (client *GoRPCTabletManagerClient) SnapshotSourceEnd(ctx context.Context, tablet *topo.TabletInfo, args *actionnode.SnapshotSourceEndArgs) error {
return client.rpcCallTablet(ctx, tablet, actionnode.TabletActionSnapshotSourceEnd, args, &rpc.Unused{})
}
// ReserveForRestore is part of the tmclient.TabletManagerClient interface
func (client *GoRPCTabletManagerClient) ReserveForRestore(ctx context.Context, tablet *topo.TabletInfo, args *actionnode.ReserveForRestoreArgs) error {
return client.rpcCallTablet(ctx, tablet, actionnode.TabletActionReserveForRestore, args, &rpc.Unused{})
}
// Restore is part of the tmclient.TabletManagerClient interface
func (client *GoRPCTabletManagerClient) Restore(ctx context.Context, tablet *topo.TabletInfo, sa *actionnode.RestoreArgs) (<-chan *logutil.LoggerEvent, tmclient.ErrFunc, error) {
var connectTimeout time.Duration
deadline, ok := ctx.Deadline()
if ok {
connectTimeout = deadline.Sub(time.Now())
if connectTimeout < 0 {
return nil, nil, timeoutError{fmt.Errorf("timeout connecting to TabletManager.Restore on %v", tablet.Alias)}
}
}
rpcClient, err := bsonrpc.DialHTTP("tcp", tablet.Addr(), connectTimeout, nil)
if err != nil {
return nil, nil, err
}
logstream := make(chan *logutil.LoggerEvent, 10)
rpcstream := make(chan *logutil.LoggerEvent, 10)
c := rpcClient.StreamGo("TabletManager.Restore", sa, rpcstream)
interrupted := false
go func() {
for {
select {
case <-ctx.Done():
// context is done
interrupted = true
close(logstream)
rpcClient.Close()
return
case ssr, ok := <-rpcstream:
if !ok {
close(logstream)
rpcClient.Close()
return
}
logstream <- ssr
}
}
}()
return logstream, func() error {
// this is only called after streaming is done
if interrupted {
return fmt.Errorf("TabletManager.Restore interrupted by context")
}
return c.Error
}, nil
}
//
// RPC related methods
//

Просмотреть файл

@ -494,84 +494,6 @@ func (tm *TabletManager) Backup(ctx context.Context, args *gorpcproto.BackupArgs
})
}
// Snapshot wraps RPCAgent.Snapshot
func (tm *TabletManager) Snapshot(ctx context.Context, args *actionnode.SnapshotArgs, sendReply func(interface{}) error) error {
ctx = callinfo.RPCWrapCallInfo(ctx)
return tm.agent.RPCWrapLockAction(ctx, actionnode.TabletActionSnapshot, args, nil, true, func() error {
// create a logger, send the result back to the caller
logger := logutil.NewChannelLogger(10)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for e := range logger {
ssr := &gorpcproto.SnapshotStreamingReply{
Log: &e,
}
// Note we don't interrupt the loop here, as
// we still need to flush and finish the
// command, even if the channel to the client
// has been broken. We'll just keep trying to send.
sendReply(ssr)
}
wg.Done()
}()
sr, err := tm.agent.Snapshot(ctx, args, logger)
close(logger)
wg.Wait()
if err != nil {
return err
}
ssr := &gorpcproto.SnapshotStreamingReply{
Result: sr,
}
sendReply(ssr)
return nil
})
}
// SnapshotSourceEnd wraps RPCAgent.
func (tm *TabletManager) SnapshotSourceEnd(ctx context.Context, args *actionnode.SnapshotSourceEndArgs, reply *rpc.Unused) error {
ctx = callinfo.RPCWrapCallInfo(ctx)
return tm.agent.RPCWrapLockAction(ctx, actionnode.TabletActionSnapshotSourceEnd, args, reply, true, func() error {
return tm.agent.SnapshotSourceEnd(ctx, args)
})
}
// ReserveForRestore wraps RPCAgent.ReserveForRestore
func (tm *TabletManager) ReserveForRestore(ctx context.Context, args *actionnode.ReserveForRestoreArgs, reply *rpc.Unused) error {
ctx = callinfo.RPCWrapCallInfo(ctx)
return tm.agent.RPCWrapLockAction(ctx, actionnode.TabletActionReserveForRestore, args, reply, true, func() error {
return tm.agent.ReserveForRestore(ctx, args)
})
}
// Restore wraps RPCAgent.Restore
func (tm *TabletManager) Restore(ctx context.Context, args *actionnode.RestoreArgs, sendReply func(interface{}) error) error {
ctx = callinfo.RPCWrapCallInfo(ctx)
return tm.agent.RPCWrapLockAction(ctx, actionnode.TabletActionRestore, args, nil, true, func() error {
// create a logger, send the result back to the caller
logger := logutil.NewChannelLogger(10)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for e := range logger {
// Note we don't interrupt the loop here, as
// we still need to flush and finish the
// command, even if the channel to the client
// has been broken. We'll just keep trying to send.
sendReply(&e)
}
wg.Done()
}()
err := tm.agent.Restore(ctx, args, logger)
close(logger)
wg.Wait()
return err
})
}
// registration glue
func init() {

Просмотреть файл

@ -1,157 +0,0 @@
// Copyright 2013, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tabletmanager
// This file handles the http server for snapshots, clones, ...
import (
"fmt"
"io"
"net/http"
"os"
"path"
"path/filepath"
"strings"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/cgzip"
vtenv "github.com/youtube/vitess/go/vt/env"
"github.com/youtube/vitess/go/vt/mysqlctl"
)
// HttpHandleSnapshots handles the serving of files from the local tablet
func HttpHandleSnapshots(mycnf *mysqlctl.Mycnf, uid uint32) {
// make a list of paths we can serve HTTP traffic from.
// we don't resolve them here to real paths, as they might not exits yet
snapshotDir := mysqlctl.SnapshotDir(uid)
allowedPaths := []string{
path.Join(vtenv.VtDataRoot(), "data"),
mysqlctl.TabletDir(uid),
mysqlctl.SnapshotDir(uid),
mycnf.DataDir,
mycnf.InnodbDataHomeDir,
mycnf.InnodbLogGroupHomeDir,
}
// NOTE: trailing slash in pattern means we handle all paths with this prefix
http.Handle(mysqlctl.SnapshotURLPath+"/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handleSnapshot(w, r, snapshotDir, allowedPaths)
}))
}
// serve an individual query
func handleSnapshot(rw http.ResponseWriter, req *http.Request, snapshotDir string, allowedPaths []string) {
// if we get any error, we'll try to write a server error
// (it will fail if the header has already been written, but at least
// we won't crash vttablet)
defer func() {
if x := recover(); x != nil {
log.Errorf("vttablet http server panic: %v", x)
http.Error(rw, fmt.Sprintf("500 internal server error: %v", x), http.StatusInternalServerError)
}
}()
// /snapshot must be rewritten to the actual location of the snapshot.
relative, err := filepath.Rel(mysqlctl.SnapshotURLPath, req.URL.Path)
if err != nil {
log.Errorf("bad snapshot relative path %v %v", req.URL.Path, err)
http.Error(rw, "400 bad request", http.StatusBadRequest)
return
}
// Make sure that realPath is absolute and resolve any escaping from
// snapshotDir through a symlink.
realPath, err := filepath.Abs(path.Join(snapshotDir, relative))
if err != nil {
log.Errorf("bad snapshot absolute path %v %v", req.URL.Path, err)
http.Error(rw, "400 bad request", http.StatusBadRequest)
return
}
realPath, err = filepath.EvalSymlinks(realPath)
if err != nil {
log.Errorf("bad snapshot symlink eval %v %v", req.URL.Path, err)
http.Error(rw, "400 bad request", http.StatusBadRequest)
return
}
// Resolve all the possible roots and make sure we're serving
// from one of them
for _, allowedPath := range allowedPaths {
// eval the symlinks of the allowed path
allowedPath, err := filepath.EvalSymlinks(allowedPath)
if err != nil {
continue
}
if strings.HasPrefix(realPath, allowedPath) {
sendFile(rw, req, realPath)
return
}
}
log.Errorf("bad snapshot real path %v %v", req.URL.Path, realPath)
http.Error(rw, "400 bad request", http.StatusBadRequest)
}
// custom function to serve files
func sendFile(rw http.ResponseWriter, req *http.Request, path string) {
log.Infof("serve %v %v", req.URL.Path, path)
file, err := os.Open(path)
if err != nil {
http.NotFound(rw, req)
return
}
defer file.Close()
fileinfo, err := file.Stat()
if err != nil {
http.NotFound(rw, req)
return
}
// for directories, or for files smaller than 1k, use library
if fileinfo.Mode().IsDir() || fileinfo.Size() < 1024 {
http.ServeFile(rw, req, path)
return
}
// supports If-Modified-Since header
if t, err := time.Parse(http.TimeFormat, req.Header.Get("If-Modified-Since")); err == nil && fileinfo.ModTime().Before(t.Add(1*time.Second)) {
rw.WriteHeader(http.StatusNotModified)
return
}
// support Accept-Encoding header
var writer io.Writer = rw
var reader io.Reader = file
if !strings.HasSuffix(path, ".gz") {
ae := req.Header.Get("Accept-Encoding")
if strings.Contains(ae, "gzip") {
gz, err := cgzip.NewWriterLevel(rw, cgzip.Z_BEST_SPEED)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
rw.Header().Set("Content-Encoding", "gzip")
defer gz.Close()
writer = gz
}
}
// add content-length if we know it
if writer == rw && reader == file {
rw.Header().Set("Content-Length", fmt.Sprintf("%v", fileinfo.Size()))
}
// and just copy content out
rw.Header().Set("Last-Modified", fileinfo.ModTime().UTC().Format(http.TimeFormat))
rw.WriteHeader(http.StatusOK)
if _, err := io.Copy(writer, reader); err != nil {
log.Warningf("transfer failed %v: %v", path, err)
}
}

Просмотреть файл

@ -24,9 +24,6 @@ var tabletManagerProtocol = flag.String("tablet_manager_protocol", "bson", "the
// ErrFunc is used by streaming RPCs that don't return a specific result
type ErrFunc func() error
// SnapshotReplyFunc is used by Snapshot to return result and error
type SnapshotReplyFunc func() (*actionnode.SnapshotReply, error)
// TabletManagerClient defines the interface used to talk to a remote tablet
type TabletManagerClient interface {
//
@ -190,18 +187,6 @@ type TabletManagerClient interface {
// Backup creates a database backup
Backup(ctx context.Context, tablet *topo.TabletInfo, concurrency int) (<-chan *logutil.LoggerEvent, ErrFunc, error)
// Snapshot takes a database snapshot
Snapshot(ctx context.Context, tablet *topo.TabletInfo, sa *actionnode.SnapshotArgs) (<-chan *logutil.LoggerEvent, SnapshotReplyFunc, error)
// SnapshotSourceEnd restarts the mysql server
SnapshotSourceEnd(ctx context.Context, tablet *topo.TabletInfo, ssea *actionnode.SnapshotSourceEndArgs) error
// ReserveForRestore will prepare a server for restore
ReserveForRestore(ctx context.Context, tablet *topo.TabletInfo, rfra *actionnode.ReserveForRestoreArgs) error
// Restore restores a database snapshot
Restore(ctx context.Context, tablet *topo.TabletInfo, sa *actionnode.RestoreArgs) (<-chan *logutil.LoggerEvent, ErrFunc, error)
//
// RPC related methods
//

Просмотреть файл

@ -107,21 +107,6 @@ var commands = []commandGroup{
command{"Backup", commandBackup,
"[-concurrency=4] <tablet alias>",
"Stop mysqld and copy data to BackupStorage."},
command{"Snapshot", commandSnapshot,
"[-force] [-server-mode] [-concurrency=4] <tablet alias>",
"Stop mysqld and copy compressed data aside."},
command{"SnapshotSourceEnd", commandSnapshotSourceEnd,
"[-slave-start] [-read-write] <tablet alias> <original tablet type>",
"Restart Mysql and restore original server type." +
"Valid <tablet type>:\n" +
" " + strings.Join(topo.MakeStringTypeList(topo.AllTabletTypes), " ")},
command{"Restore", commandRestore,
"[-fetch-concurrency=3] [-fetch-retry-count=3] [-dont-wait-for-slave-start] <src tablet alias> <src manifest file> <dst tablet alias> [<new master tablet alias>]",
"Copy the given snaphot from the source tablet and restart replication to the new master path (or uses the <src tablet path> if not specified). If <src manifest file> is 'default', uses the default value.\n" +
"NOTE: This does not wait for replication to catch up. The destination tablet must be 'idle' to begin with. It will transition to 'spare' once the restore is complete."},
command{"Clone", commandClone,
"[-force] [-concurrency=4] [-fetch-concurrency=3] [-fetch-retry-count=3] [-server-mode] <src tablet alias> <dst tablet alias> ...",
"This performs Snapshot and then Restore on all the targets in parallel. The advantage of having separate actions is that one snapshot can be used for many restores, and it's then easier to spread them over time."},
command{"ExecuteHook", commandExecuteHook,
"<tablet alias> <hook name> [<param1=value1> <param2=value2> ...]",
"This runs the specified hook on the given tablet."},
@ -931,110 +916,6 @@ func commandBackup(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fl
return errFunc()
}
func commandSnapshotSourceEnd(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
slaveStartRequired := subFlags.Bool("slave-start", false, "will restart replication")
readWrite := subFlags.Bool("read-write", false, "will make the server read-write")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 2 {
return fmt.Errorf("action SnapshotSourceEnd requires <tablet alias> <original server type>")
}
tabletAlias, err := topo.ParseTabletAliasString(subFlags.Arg(0))
if err != nil {
return err
}
tabletType, err := parseTabletType(subFlags.Arg(1), topo.AllTabletTypes)
if err != nil {
return err
}
return wr.SnapshotSourceEnd(ctx, tabletAlias, *slaveStartRequired, !(*readWrite), tabletType)
}
func commandSnapshot(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
force := subFlags.Bool("force", false, "will force the snapshot for a master, and turn it into a backup")
serverMode := subFlags.Bool("server-mode", false, "will symlink the data files and leave mysqld stopped")
concurrency := subFlags.Int("concurrency", 4, "how many compression/checksum jobs to run simultaneously")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 1 {
return fmt.Errorf("action Snapshot requires <tablet alias>")
}
tabletAlias, err := topo.ParseTabletAliasString(subFlags.Arg(0))
if err != nil {
return err
}
sr, originalType, err := wr.Snapshot(ctx, tabletAlias, *force, *concurrency, *serverMode)
if err == nil {
log.Infof("Manifest: %v", sr.ManifestPath)
log.Infof("ParentAlias: %v", sr.ParentAlias)
if *serverMode {
log.Infof("SlaveStartRequired: %v", sr.SlaveStartRequired)
log.Infof("ReadOnly: %v", sr.ReadOnly)
log.Infof("OriginalType: %v", originalType)
}
}
return err
}
func commandRestore(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
dontWaitForSlaveStart := subFlags.Bool("dont-wait-for-slave-start", false, "won't wait for replication to start (useful when restoring from snapshot source that is the replication master)")
fetchConcurrency := subFlags.Int("fetch-concurrency", 3, "how many files to fetch simultaneously")
fetchRetryCount := subFlags.Int("fetch-retry-count", 3, "how many times to retry a failed transfer")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 3 && subFlags.NArg() != 4 {
return fmt.Errorf("action Restore requires <src tablet alias> <src manifest path> <dst tablet alias> [<new master tablet alias>]")
}
srcTabletAlias, err := topo.ParseTabletAliasString(subFlags.Arg(0))
if err != nil {
return err
}
dstTabletAlias, err := topo.ParseTabletAliasString(subFlags.Arg(2))
if err != nil {
return err
}
parentAlias := srcTabletAlias
if subFlags.NArg() == 4 {
parentAlias, err = topo.ParseTabletAliasString(subFlags.Arg(3))
if err != nil {
return err
}
}
return wr.Restore(ctx, srcTabletAlias, subFlags.Arg(1), dstTabletAlias, parentAlias, *fetchConcurrency, *fetchRetryCount, false, *dontWaitForSlaveStart)
}
func commandClone(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
force := subFlags.Bool("force", false, "will force the snapshot for a master, and turn it into a backup")
concurrency := subFlags.Int("concurrency", 4, "how many compression/checksum jobs to run simultaneously")
fetchConcurrency := subFlags.Int("fetch-concurrency", 3, "how many files to fetch simultaneously")
fetchRetryCount := subFlags.Int("fetch-retry-count", 3, "how many times to retry a failed transfer")
serverMode := subFlags.Bool("server-mode", false, "will keep the snapshot server offline to serve DB files directly")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() < 2 {
return fmt.Errorf("action Clone requires <src tablet alias> <dst tablet alias> [...]")
}
srcTabletAlias, err := topo.ParseTabletAliasString(subFlags.Arg(0))
if err != nil {
return err
}
dstTabletAliases := make([]topo.TabletAlias, subFlags.NArg()-1)
for i := 1; i < subFlags.NArg(); i++ {
dstTabletAliases[i-1], err = topo.ParseTabletAliasString(subFlags.Arg(i))
if err != nil {
return err
}
}
return wr.Clone(ctx, srcTabletAlias, dstTabletAliases, *force, *concurrency, *fetchConcurrency, *fetchRetryCount, *serverMode)
}
func commandExecuteFetchAsDba(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
maxRows := subFlags.Int("max_rows", 10000, "maximum number of rows to allow in reset")
wantFields := subFlags.Bool("want_fields", false, "also get the field names")

Просмотреть файл

@ -1,235 +0,0 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package wrangler
import (
"fmt"
"sync"
"github.com/youtube/vitess/go/vt/concurrency"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/topo"
"golang.org/x/net/context"
)
// Snapshot takes a tablet snapshot.
//
// forceMasterSnapshot: Normally a master is not a viable tablet to snapshot.
// However, there are degenerate cases where you need to override this, for
// instance the initial clone of a new master.
//
// serverMode: if specified, the server will stop its mysqld, and be
// ready to serve the data files directly. Slaves can just download
// these and use them directly. Call SnapshotSourceEnd to return into
// serving mode. If not specified, the server will create an archive
// of the files, store them locally, and restart.
//
// If error is nil, returns the SnapshotReply from the remote host,
// and the original type the server was before the snapshot.
func (wr *Wrangler) Snapshot(ctx context.Context, tabletAlias topo.TabletAlias, forceMasterSnapshot bool, snapshotConcurrency int, serverMode bool) (*actionnode.SnapshotReply, topo.TabletType, error) {
// read the tablet to be able to RPC to it, and also to get its
// original type
ti, err := wr.ts.GetTablet(tabletAlias)
if err != nil {
return nil, "", err
}
originalType := ti.Tablet.Type
// execute the remote action, log the results, save the error
args := &actionnode.SnapshotArgs{
Concurrency: snapshotConcurrency,
ServerMode: serverMode,
ForceMasterSnapshot: forceMasterSnapshot,
}
logStream, errFunc, err := wr.tmc.Snapshot(ctx, ti, args)
if err != nil {
return nil, "", err
}
for e := range logStream {
wr.Logger().Infof("Snapshot(%v): %v", tabletAlias, e)
}
reply, err := errFunc()
return reply, originalType, err
}
// SnapshotSourceEnd will change the tablet back to its original type
// once it's done serving backups.
func (wr *Wrangler) SnapshotSourceEnd(ctx context.Context, tabletAlias topo.TabletAlias, slaveStartRequired, readWrite bool, originalType topo.TabletType) (err error) {
var ti *topo.TabletInfo
ti, err = wr.ts.GetTablet(tabletAlias)
if err != nil {
return
}
args := &actionnode.SnapshotSourceEndArgs{
SlaveStartRequired: slaveStartRequired,
ReadOnly: !readWrite,
OriginalType: originalType,
}
return wr.tmc.SnapshotSourceEnd(ctx, ti, args)
}
// ReserveForRestore will make sure a tablet is ready to be used as a restore
// target.
func (wr *Wrangler) ReserveForRestore(ctx context.Context, srcTabletAlias, dstTabletAlias topo.TabletAlias) (err error) {
// read our current tablet, verify its state before sending it
// to the tablet itself
tablet, err := wr.ts.GetTablet(dstTabletAlias)
if err != nil {
return err
}
if tablet.Type != topo.TYPE_IDLE {
return fmt.Errorf("expected idle type, not %v: %v", tablet.Type, dstTabletAlias)
}
args := &actionnode.ReserveForRestoreArgs{
SrcTabletAlias: srcTabletAlias,
}
return wr.tmc.ReserveForRestore(ctx, tablet, args)
}
// UnreserveForRestore switches the tablet back to its original state,
// the restore won't happen.
func (wr *Wrangler) UnreserveForRestore(ctx context.Context, dstTabletAlias topo.TabletAlias) (err error) {
tablet, err := wr.ts.GetTablet(dstTabletAlias)
if err != nil {
return err
}
err = topo.DeleteTabletReplicationData(wr.ts, tablet.Tablet)
if err != nil {
return err
}
return wr.ChangeType(ctx, tablet.Alias, topo.TYPE_IDLE, false)
}
// Restore actually performs the restore action on a tablet.
func (wr *Wrangler) Restore(ctx context.Context, srcTabletAlias topo.TabletAlias, srcFilePath string, dstTabletAlias, parentAlias topo.TabletAlias, fetchConcurrency, fetchRetryCount int, wasReserved, dontWaitForSlaveStart bool) error {
// read our current tablet, verify its state before sending it
// to the tablet itself
tablet, err := wr.ts.GetTablet(dstTabletAlias)
if err != nil {
return err
}
if wasReserved {
if tablet.Type != topo.TYPE_RESTORE {
return fmt.Errorf("expected restore type, not %v: %v", tablet.Type, dstTabletAlias)
}
} else {
if tablet.Type != topo.TYPE_IDLE {
return fmt.Errorf("expected idle type, not %v: %v", tablet.Type, dstTabletAlias)
}
}
// update the shard record if we need to, to update Cells
srcTablet, err := wr.ts.GetTablet(srcTabletAlias)
if err != nil {
return err
}
si, err := wr.ts.GetShard(srcTablet.Keyspace, srcTablet.Shard)
if err != nil {
return fmt.Errorf("Cannot read shard: %v", err)
}
if err := wr.updateShardCellsAndMaster(ctx, si, tablet.Alias, topo.TYPE_SPARE, false); err != nil {
return err
}
// do the work
args := &actionnode.RestoreArgs{
SrcTabletAlias: srcTabletAlias,
SrcFilePath: srcFilePath,
ParentAlias: parentAlias,
FetchConcurrency: fetchConcurrency,
FetchRetryCount: fetchRetryCount,
WasReserved: wasReserved,
DontWaitForSlaveStart: dontWaitForSlaveStart,
}
logStream, errFunc, err := wr.tmc.Restore(ctx, tablet, args)
if err != nil {
return err
}
for e := range logStream {
wr.Logger().Infof("Restore(%v): %v", dstTabletAlias, e)
}
if err := errFunc(); err != nil {
return err
}
// Restore moves us into the replication graph as a
// spare. There are no consequences to the replication or
// serving graphs, so no rebuild required.
return nil
}
// UnreserveForRestoreMulti calls UnreserveForRestore on all targets.
func (wr *Wrangler) UnreserveForRestoreMulti(ctx context.Context, dstTabletAliases []topo.TabletAlias) {
for _, dstTabletAlias := range dstTabletAliases {
ufrErr := wr.UnreserveForRestore(ctx, dstTabletAlias)
if ufrErr != nil {
wr.Logger().Errorf("Failed to UnreserveForRestore destination tablet after failed source snapshot: %v", ufrErr)
} else {
wr.Logger().Infof("Un-reserved %v", dstTabletAlias)
}
}
}
// Clone will do all the necessary actions to copy all the data from a
// source to a set of destinations.
func (wr *Wrangler) Clone(ctx context.Context, srcTabletAlias topo.TabletAlias, dstTabletAliases []topo.TabletAlias, forceMasterSnapshot bool, snapshotConcurrency, fetchConcurrency, fetchRetryCount int, serverMode bool) error {
// make sure the destination can be restored into (otherwise
// there is no point in taking the snapshot in the first place),
// and reserve it.
reserved := make([]topo.TabletAlias, 0, len(dstTabletAliases))
for _, dstTabletAlias := range dstTabletAliases {
err := wr.ReserveForRestore(ctx, srcTabletAlias, dstTabletAlias)
if err != nil {
wr.UnreserveForRestoreMulti(ctx, reserved)
return err
}
reserved = append(reserved, dstTabletAlias)
wr.Logger().Infof("Successfully reserved %v for restore", dstTabletAlias)
}
// take the snapshot, or put the server in SnapshotSource mode
// srcFilePath, parentAlias, slaveStartRequired, readWrite
sr, originalType, err := wr.Snapshot(ctx, srcTabletAlias, forceMasterSnapshot, snapshotConcurrency, serverMode)
if err != nil {
// The snapshot failed so un-reserve the destinations and return
wr.UnreserveForRestoreMulti(ctx, reserved)
return err
}
// try to restore the snapshot
// In serverMode, and in the case where we're replicating from
// the master, we can't wait for replication, as the master is down.
wg := sync.WaitGroup{}
rec := concurrency.FirstErrorRecorder{}
for _, dstTabletAlias := range dstTabletAliases {
wg.Add(1)
go func(dstTabletAlias topo.TabletAlias) {
e := wr.Restore(ctx, srcTabletAlias, sr.ManifestPath, dstTabletAlias, sr.ParentAlias, fetchConcurrency, fetchRetryCount, true, serverMode && originalType == topo.TYPE_MASTER)
rec.RecordError(e)
wg.Done()
}(dstTabletAlias)
}
wg.Wait()
err = rec.Error()
// in any case, fix the server
if serverMode {
resetErr := wr.SnapshotSourceEnd(ctx, srcTabletAlias, sr.SlaveStartRequired, sr.ReadOnly, originalType)
if resetErr != nil {
if err == nil {
// If there is no other error, this matters.
err = resetErr
} else {
// In the context of a larger failure, just log a note to cleanup.
wr.Logger().Errorf("Failed to reset snapshot source: %v - vtctl SnapshotSourceEnd is required", resetErr)
}
}
}
return err
}

Просмотреть файл

@ -24,10 +24,7 @@ var (
)
// Wrangler manages complex actions on the topology, like reparents,
// snapshots, restores, ...
//
// FIXME(alainjobart) take the context out of this structure.
// We want the context to come from the outside on every call.
// backups, resharding, ...
//
// Multiple go routines can use the same Wrangler at the same time,
// provided they want to share the same logger / topo server / lock timeout.