зеркало из https://github.com/github/vitess-gh.git
Add RestoreFromBackup command to vtctl.
Executing the command vttablet will completely delete the existing data and then replace it from the latest backup. The sequence of actions is basically the same as vttablet has already used to setup a freshly started replica. Also mysqlctl interface is extended with ReinitConfig function. It's used by vttablet to change server_id of the replica before restoring it from backup. That's necessary to avoid a possible situation when restored replica skips transactions in the replication stream which have the same server_id. The new RestoreFromBackup command will be used later in the implementation of an automated pivot. This change doesn't add any tests for the new functionality because I didn't find a place where I could add these tests. Please tell me if there is actually a place that I missed. Meanwhile I tested this manually on the example Kubernetes setup.
This commit is contained in:
Родитель
7f0d5ec407
Коммит
66841f7500
|
@ -1704,6 +1704,7 @@ Blocks until the specified shard has caught up with the filtered replication of
|
|||
* [Ping](#ping)
|
||||
* [RefreshState](#refreshstate)
|
||||
* [ReparentTablet](#reparenttablet)
|
||||
* [RestoreFromBackup](#restorefrombackup)
|
||||
* [RunHealthCheck](#runhealthcheck)
|
||||
* [SetReadOnly](#setreadonly)
|
||||
* [SetReadWrite](#setreadwrite)
|
||||
|
@ -1995,6 +1996,19 @@ Reparent a tablet to the current master in the shard. This only works if the cur
|
|||
* action <code><ReparentTablet></code> requires <code><tablet alias></code> This error occurs if the command is not called with exactly one argument.
|
||||
|
||||
|
||||
### RestoreFromBackup
|
||||
|
||||
Stops mysqld and restores the data from the latest backup.
|
||||
|
||||
#### Example
|
||||
|
||||
<pre class="command-example">RestoreFromBackup <tablet alias></pre>
|
||||
|
||||
#### Errors
|
||||
|
||||
* The <code><RestoreFromBackup></code> command requires the <code><tablet alias></code> argument. This error occurs if the command is not called with exactly one argument.
|
||||
|
||||
|
||||
### RunHealthCheck
|
||||
|
||||
Runs a health check on a remote tablet.
|
||||
|
|
|
@ -58,6 +58,20 @@ func initCmd(subFlags *flag.FlagSet, args []string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func reinitConfigCmd(subFlags *flag.FlagSet, args []string) error {
|
||||
// There ought to be an existing my.cnf, so use it to find mysqld.
|
||||
mysqld, err := mysqlctl.OpenMysqld(uint32(*tabletUID), dbconfigFlags)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to find mysql config: %v", err)
|
||||
}
|
||||
defer mysqld.Close()
|
||||
|
||||
if err := mysqld.ReinitConfig(context.TODO()); err != nil {
|
||||
return fmt.Errorf("failed to reinit mysql config: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func shutdownCmd(subFlags *flag.FlagSet, args []string) error {
|
||||
waitTime := subFlags.Duration("wait_time", 5*time.Minute, "how long to wait for shutdown")
|
||||
subFlags.Parse(args)
|
||||
|
@ -161,6 +175,8 @@ type command struct {
|
|||
var commands = []command{
|
||||
{"init", initCmd, "[-wait_time=5m] [-init_db_sql_file=]",
|
||||
"Initalizes the directory structure and starts mysqld"},
|
||||
{"reinit_config", reinitConfigCmd, "",
|
||||
"Reinitalizes my.cnf file with new server_id"},
|
||||
{"teardown", teardownCmd, "[-wait_time=5m] [-force]",
|
||||
"Shuts mysqld down, and removes the directory"},
|
||||
{"start", startCmd, "[-wait_time=5m]",
|
||||
|
|
|
@ -765,3 +765,7 @@ func (itmc *internalTabletManagerClient) PromoteSlave(ctx context.Context, table
|
|||
func (itmc *internalTabletManagerClient) Backup(ctx context.Context, tablet *topodatapb.Tablet, concurrency int) (logutil.EventStream, error) {
|
||||
return nil, fmt.Errorf("not implemented in vtcombo")
|
||||
}
|
||||
|
||||
func (itmc *internalTabletManagerClient) RestoreFromBackup(ctx context.Context, tablet *topodatapb.Tablet) (logutil.EventStream, error) {
|
||||
return nil, fmt.Errorf("not implemented in vtcombo")
|
||||
}
|
||||
|
|
|
@ -582,9 +582,9 @@ func removeExistingFiles(cnf *Mycnf) error {
|
|||
// Restore is the main entry point for backup restore. If there is no
|
||||
// appropriate backup on the BackupStorage, Restore logs an error
|
||||
// and returns ErrNoBackup. Any other error is returned.
|
||||
func Restore(ctx context.Context, mysqld MysqlDaemon, dir string, restoreConcurrency int, hookExtraEnv map[string]string) (replication.Position, error) {
|
||||
func Restore(ctx context.Context, mysqld MysqlDaemon, dir string, restoreConcurrency int, hookExtraEnv map[string]string, logger logutil.Logger, deleteBeforeRestore bool) (replication.Position, error) {
|
||||
// find the right backup handle: most recent one, with a MANIFEST
|
||||
log.Infof("Restore: looking for a suitable backup to restore")
|
||||
logger.Infof("Restore: looking for a suitable backup to restore")
|
||||
bs, err := backupstorage.GetBackupStorage()
|
||||
if err != nil {
|
||||
return replication.Position{}, err
|
||||
|
@ -612,54 +612,62 @@ func Restore(ctx context.Context, mysqld MysqlDaemon, dir string, restoreConcurr
|
|||
continue
|
||||
}
|
||||
|
||||
log.Infof("Restore: found backup %v %v to restore with %v files", bh.Directory(), bh.Name(), len(bm.FileEntries))
|
||||
logger.Infof("Restore: found backup %v %v to restore with %v files", bh.Directory(), bh.Name(), len(bm.FileEntries))
|
||||
break
|
||||
}
|
||||
if toRestore < 0 {
|
||||
log.Errorf("No backup to restore on BackupStorage for directory %v", dir)
|
||||
logger.Errorf("No backup to restore on BackupStorage for directory %v", dir)
|
||||
return replication.Position{}, ErrNoBackup
|
||||
}
|
||||
|
||||
log.Infof("Restore: checking no existing data is present")
|
||||
ok, err := checkNoDB(ctx, mysqld)
|
||||
if err != nil {
|
||||
return replication.Position{}, err
|
||||
}
|
||||
if !ok {
|
||||
return replication.Position{}, ErrExistingDB
|
||||
if !deleteBeforeRestore {
|
||||
logger.Infof("Restore: checking no existing data is present")
|
||||
ok, err := checkNoDB(ctx, mysqld)
|
||||
if err != nil {
|
||||
return replication.Position{}, err
|
||||
}
|
||||
if !ok {
|
||||
return replication.Position{}, ErrExistingDB
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("Restore: shutdown mysqld")
|
||||
logger.Infof("Restore: shutdown mysqld")
|
||||
err = mysqld.Shutdown(ctx, true)
|
||||
if err != nil {
|
||||
return replication.Position{}, err
|
||||
}
|
||||
|
||||
log.Infof("Restore: deleting existing files")
|
||||
logger.Infof("Restore: deleting existing files")
|
||||
if err := removeExistingFiles(mysqld.Cnf()); err != nil {
|
||||
return replication.Position{}, err
|
||||
}
|
||||
|
||||
log.Infof("Restore: copying all files")
|
||||
logger.Infof("Restore: reinit config file")
|
||||
err = mysqld.ReinitConfig(ctx)
|
||||
if err != nil {
|
||||
return replication.Position{}, err
|
||||
}
|
||||
|
||||
logger.Infof("Restore: copying all files")
|
||||
if err := restoreFiles(mysqld.Cnf(), bh, bm.FileEntries, restoreConcurrency); err != nil {
|
||||
return replication.Position{}, err
|
||||
}
|
||||
|
||||
// mysqld needs to be running in order for mysql_upgrade to work.
|
||||
log.Infof("Restore: starting mysqld for mysql_upgrade")
|
||||
logger.Infof("Restore: starting mysqld for mysql_upgrade")
|
||||
err = mysqld.Start(ctx)
|
||||
if err != nil {
|
||||
return replication.Position{}, err
|
||||
}
|
||||
|
||||
log.Infof("Restore: running mysql_upgrade")
|
||||
logger.Infof("Restore: running mysql_upgrade")
|
||||
if err := mysqld.RunMysqlUpgrade(); err != nil {
|
||||
return replication.Position{}, fmt.Errorf("mysql_upgrade failed: %v", err)
|
||||
}
|
||||
|
||||
// The MySQL manual recommends restarting mysqld after running mysql_upgrade,
|
||||
// so that any changes made to system tables take effect.
|
||||
log.Infof("Restore: restarting mysqld after mysql_upgrade")
|
||||
logger.Infof("Restore: restarting mysqld after mysql_upgrade")
|
||||
err = mysqld.Shutdown(ctx, true)
|
||||
if err != nil {
|
||||
return replication.Position{}, err
|
||||
|
|
|
@ -9,9 +9,7 @@ This file contains common functions for cmd/mysqlctl and cmd/mysqlctld.
|
|||
package mysqlctl
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"math/big"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/dbconfigs"
|
||||
)
|
||||
|
@ -20,6 +18,7 @@ import (
|
|||
// installation that hasn't been set up yet. This will generate a new my.cnf
|
||||
// from scratch and use that to call NewMysqld().
|
||||
func CreateMysqld(tabletUID uint32, mysqlSocket string, mysqlPort int32, dbconfigFlags dbconfigs.DBConfigFlag) (*Mysqld, error) {
|
||||
mycnf := NewMycnf(tabletUID, mysqlPort)
|
||||
// Choose a random MySQL server-id, since this is a fresh data dir.
|
||||
// We don't want to use the tablet UID as the MySQL server-id,
|
||||
// because reusing server-ids is not safe.
|
||||
|
@ -29,11 +28,9 @@ func CreateMysqld(tabletUID uint32, mysqlSocket string, mysqlPort int32, dbconfi
|
|||
// server-id as before, and if this tablet was recently a master, then it can
|
||||
// lose data by skipping binlog events due to replicate-same-server-id=FALSE,
|
||||
// which is the default setting.
|
||||
mysqlServerID, err := randomServerID()
|
||||
if err != nil {
|
||||
if err := mycnf.RandomizeMysqlServerID(); err != nil {
|
||||
return nil, fmt.Errorf("couldn't generate random MySQL server_id: %v", err)
|
||||
}
|
||||
mycnf := NewMycnf(tabletUID, mysqlServerID, mysqlPort)
|
||||
if mysqlSocket != "" {
|
||||
mycnf.SocketFile = mysqlSocket
|
||||
}
|
||||
|
@ -62,21 +59,3 @@ func OpenMysqld(tabletUID uint32, dbconfigFlags dbconfigs.DBConfigFlag) (*Mysqld
|
|||
|
||||
return NewMysqld("Dba", "App", mycnf, &dbcfgs.Dba, &dbcfgs.App.ConnParams, &dbcfgs.Repl), nil
|
||||
}
|
||||
|
||||
// randomServerID generates a random MySQL server_id.
|
||||
//
|
||||
// The returned ID will be in the range [100, 2^32).
|
||||
// It avoids 0 because that's reserved for mysqlbinlog dumps.
|
||||
// It also avoids 1-99 because low numbers are used for fake slave connections.
|
||||
// See NewSlaveConnection() in slave_connection.go for more on that.
|
||||
func randomServerID() (uint32, error) {
|
||||
// rand.Int(_, max) returns a value in the range [0, max).
|
||||
bigN, err := rand.Int(rand.Reader, big.NewInt(1<<32-100))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n := bigN.Uint64()
|
||||
// n is in the range [0, 2^32 - 100).
|
||||
// Add back 100 to put it in the range [100, 2^32).
|
||||
return uint32(n + 100), nil
|
||||
}
|
||||
|
|
|
@ -60,6 +60,12 @@ func (c *client) RunMysqlUpgrade(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// ReinitConfig is part of the MysqlctlClient interface.
|
||||
func (c *client) ReinitConfig(ctx context.Context) error {
|
||||
_, err := c.c.ReinitConfig(ctx, &mysqlctlpb.ReinitConfigRequest{})
|
||||
return err
|
||||
}
|
||||
|
||||
// Close is part of the MysqlctlClient interface.
|
||||
func (c *client) Close() {
|
||||
c.cc.Close()
|
||||
|
|
|
@ -37,6 +37,11 @@ func (s *server) RunMysqlUpgrade(ctx context.Context, request *mysqlctlpb.RunMys
|
|||
return &mysqlctlpb.RunMysqlUpgradeResponse{}, s.mysqld.RunMysqlUpgrade()
|
||||
}
|
||||
|
||||
// ReinitConfig implements the server side of the MysqlctlClient interface.
|
||||
func (s *server) ReinitConfig(ctx context.Context, request *mysqlctlpb.ReinitConfigRequest) (*mysqlctlpb.ReinitConfigResponse, error) {
|
||||
return &mysqlctlpb.ReinitConfigResponse{}, s.mysqld.ReinitConfig(ctx)
|
||||
}
|
||||
|
||||
// StartServer registers the Server for RPCs.
|
||||
func StartServer(s *grpc.Server, mysqld *mysqlctl.Mysqld) {
|
||||
mysqlctlpb.RegisterMysqlCtlServer(s, &server{mysqld})
|
||||
|
|
|
@ -8,8 +8,10 @@ package mysqlctl
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/big"
|
||||
"path"
|
||||
"text/template"
|
||||
|
||||
|
@ -43,11 +45,11 @@ const (
|
|||
// uid is a unique id for a particular tablet - it must be unique within the
|
||||
// tabletservers deployed within a keyspace, lest there be collisions on disk.
|
||||
// mysqldPort needs to be unique per instance per machine.
|
||||
func NewMycnf(tabletUID, mysqlServerID uint32, mysqlPort int32) *Mycnf {
|
||||
func NewMycnf(tabletUID uint32, mysqlPort int32) *Mycnf {
|
||||
cnf := new(Mycnf)
|
||||
cnf.path = mycnfFile(tabletUID)
|
||||
tabletDir := TabletDir(tabletUID)
|
||||
cnf.ServerID = mysqlServerID
|
||||
cnf.ServerID = tabletUID
|
||||
cnf.MysqlPort = mysqlPort
|
||||
cnf.DataDir = path.Join(tabletDir, dataDir)
|
||||
cnf.InnodbDataHomeDir = path.Join(tabletDir, innodbDataSubdir)
|
||||
|
@ -126,3 +128,22 @@ func (cnf *Mycnf) fillMycnfTemplate(tmplSrc string) (string, error) {
|
|||
}
|
||||
return mycnfData.String(), nil
|
||||
}
|
||||
|
||||
// RandomizeMysqlServerID generates a random MySQL server_id.
|
||||
//
|
||||
// The value assigned to ServerID will be in the range [100, 2^32).
|
||||
// It avoids 0 because that's reserved for mysqlbinlog dumps.
|
||||
// It also avoids 1-99 because low numbers are used for fake slave connections.
|
||||
// See NewSlaveConnection() in slave_connection.go for more on that.
|
||||
func (cnf *Mycnf) RandomizeMysqlServerID() error {
|
||||
// rand.Int(_, max) returns a value in the range [0, max).
|
||||
bigN, err := rand.Int(rand.Reader, big.NewInt(1<<32-100))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n := bigN.Uint64()
|
||||
// n is in the range [0, 2^32 - 100).
|
||||
// Add back 100 to put it in the range [100, 2^32).
|
||||
cnf.ServerID = uint32(n + 100)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -22,7 +22,11 @@ func TestMycnf(t *testing.T) {
|
|||
dbaConfig := dbconfigs.DefaultDBConfigs.Dba
|
||||
appConfig := dbconfigs.DefaultDBConfigs.App.ConnParams
|
||||
replConfig := dbconfigs.DefaultDBConfigs.Repl
|
||||
tablet0 := NewMysqld("Dba", "App", NewMycnf(11111, 22222, 6802), &dbaConfig, &appConfig, &replConfig)
|
||||
cnf := NewMycnf(11111, 6802)
|
||||
// Assigning ServerID to be different from tablet UID to make sure that there are no
|
||||
// assumptions in the code that those IDs are the same.
|
||||
cnf.ServerID = 22222
|
||||
tablet0 := NewMysqld("Dba", "App", cnf, &dbaConfig, &appConfig, &replConfig)
|
||||
defer tablet0.Close()
|
||||
root, err := env.VtRoot()
|
||||
if err != nil {
|
||||
|
|
|
@ -32,6 +32,7 @@ type MysqlDaemon interface {
|
|||
Start(ctx context.Context) error
|
||||
Shutdown(ctx context.Context, waitForMysqld bool) error
|
||||
RunMysqlUpgrade() error
|
||||
ReinitConfig(ctx context.Context) error
|
||||
Wait(ctx context.Context) error
|
||||
|
||||
// GetMysqlPort returns the current port mysql is listening on.
|
||||
|
@ -250,6 +251,11 @@ func (fmd *FakeMysqlDaemon) RunMysqlUpgrade() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// ReinitConfig is part of the MysqlDaemon interface
|
||||
func (fmd *FakeMysqlDaemon) ReinitConfig(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Wait is part of the MysqlDaemon interface.
|
||||
func (fmd *FakeMysqlDaemon) Wait(ctx context.Context) error {
|
||||
return nil
|
||||
|
|
|
@ -29,6 +29,9 @@ type MysqlctlClient interface {
|
|||
// RunMysqlUpgrade calls Mysqld.RunMysqlUpgrade remotely.
|
||||
RunMysqlUpgrade(ctx context.Context) error
|
||||
|
||||
// ReinitConfig calls Mysqld.ReinitConfig remotely.
|
||||
ReinitConfig(ctx context.Context) error
|
||||
|
||||
// Close will terminate the connection. This object won't be used anymore.
|
||||
Close()
|
||||
}
|
||||
|
|
|
@ -521,6 +521,34 @@ func (mysqld *Mysqld) initConfig(root string) error {
|
|||
return ioutil.WriteFile(mysqld.config.path, []byte(configData), 0664)
|
||||
}
|
||||
|
||||
// ReinitConfig updates the config file as if Mysqld is initializing. At the
|
||||
// moment it only randomizes ServerID because it's not safe to restore a replica
|
||||
// from a backup and then give it the same ServerID as before, MySQL can then
|
||||
// skip transactions in the replication stream with the same server_id.
|
||||
func (mysqld *Mysqld) ReinitConfig(ctx context.Context) error {
|
||||
log.Infof("Mysqld.ReinitConfig")
|
||||
|
||||
// Execute as remote action on mysqlctld if requested.
|
||||
if *socketFile != "" {
|
||||
log.Infof("executing Mysqld.ReinitConfig() remotely via mysqlctld server: %v", *socketFile)
|
||||
client, err := mysqlctlclient.New("unix", *socketFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't dial mysqlctld: %v", err)
|
||||
}
|
||||
defer client.Close()
|
||||
return client.ReinitConfig(ctx)
|
||||
}
|
||||
|
||||
if err := mysqld.config.RandomizeMysqlServerID(); err != nil {
|
||||
return err
|
||||
}
|
||||
root, err := vtenv.VtRoot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return mysqld.initConfig(root)
|
||||
}
|
||||
|
||||
func (mysqld *Mysqld) createDirs() error {
|
||||
log.Infof("creating directory %s", mysqld.tabletDir)
|
||||
if err := os.MkdirAll(mysqld.tabletDir, os.ModePerm); err != nil {
|
||||
|
|
|
@ -44,6 +44,7 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/dbconfigs"
|
||||
"github.com/youtube/vitess/go/vt/health"
|
||||
"github.com/youtube/vitess/go/vt/key"
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
"github.com/youtube/vitess/go/vt/mysqlctl"
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver"
|
||||
|
@ -257,7 +258,7 @@ func NewActionAgent(
|
|||
go func() {
|
||||
// restoreFromBackup wil just be a regular action
|
||||
// (same as if it was triggered remotely)
|
||||
if err := agent.RestoreFromBackup(batchCtx); err != nil {
|
||||
if err := agent.RestoreData(batchCtx, logutil.NewConsoleLogger(), false /* deleteBeforeRestore */); err != nil {
|
||||
println(fmt.Sprintf("RestoreFromBackup failed: %v", err))
|
||||
log.Fatalf("RestoreFromBackup failed: %v", err)
|
||||
}
|
||||
|
|
|
@ -1098,6 +1098,7 @@ func agentRPCTestPromoteSlavePanic(ctx context.Context, t *testing.T, client tmc
|
|||
|
||||
var testBackupConcurrency = 24
|
||||
var testBackupCalled = false
|
||||
var testRestoreFromBackupCalled = false
|
||||
|
||||
func (fra *fakeRPCAgent) Backup(ctx context.Context, concurrency int, logger logutil.Logger) error {
|
||||
if fra.panics {
|
||||
|
@ -1130,6 +1131,36 @@ func agentRPCTestBackupPanic(ctx context.Context, t *testing.T, client tmclient.
|
|||
expectRPCWrapLockActionPanic(t, err)
|
||||
}
|
||||
|
||||
func (fra *fakeRPCAgent) RestoreFromBackup(ctx context.Context, logger logutil.Logger) error {
|
||||
if fra.panics {
|
||||
panic(fmt.Errorf("test-triggered panic"))
|
||||
}
|
||||
logStuff(logger, 10)
|
||||
testRestoreFromBackupCalled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func agentRPCTestRestoreFromBackup(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) {
|
||||
stream, err := client.RestoreFromBackup(ctx, tablet)
|
||||
if err != nil {
|
||||
t.Fatalf("RestoreFromBackup failed: %v", err)
|
||||
}
|
||||
err = compareLoggedStuff(t, "RestoreFromBackup", stream, 10)
|
||||
compareError(t, "RestoreFromBackup", err, true, testRestoreFromBackupCalled)
|
||||
}
|
||||
|
||||
func agentRPCTestRestoreFromBackupPanic(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) {
|
||||
stream, err := client.RestoreFromBackup(ctx, tablet)
|
||||
if err != nil {
|
||||
t.Fatalf("RestoreFromBackup failed: %v", err)
|
||||
}
|
||||
e, err := stream.Recv()
|
||||
if err == nil {
|
||||
t.Fatalf("Unexpected RestoreFromBackup logs: %v", e)
|
||||
}
|
||||
expectRPCWrapLockActionPanic(t, err)
|
||||
}
|
||||
|
||||
//
|
||||
// RPC helpers
|
||||
//
|
||||
|
@ -1223,6 +1254,7 @@ func Run(t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.T
|
|||
|
||||
// Backup / restore related methods
|
||||
agentRPCTestBackup(ctx, t, client, tablet)
|
||||
agentRPCTestRestoreFromBackup(ctx, t, client, tablet)
|
||||
|
||||
//
|
||||
// Tests panic handling everywhere now
|
||||
|
@ -1274,4 +1306,5 @@ func Run(t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.T
|
|||
|
||||
// Backup / restore related methods
|
||||
agentRPCTestBackupPanic(ctx, t, client, tablet)
|
||||
agentRPCTestRestoreFromBackupPanic(ctx, t, client, tablet)
|
||||
}
|
||||
|
|
|
@ -279,3 +279,8 @@ func (e *eofEventStream) Recv() (*logutilpb.Event, error) {
|
|||
func (client *FakeTabletManagerClient) Backup(ctx context.Context, tablet *topodatapb.Tablet, concurrency int) (logutil.EventStream, error) {
|
||||
return &eofEventStream{}, nil
|
||||
}
|
||||
|
||||
// RestoreFromBackup is part of the tmclient.TabletManagerClient interface.
|
||||
func (client *FakeTabletManagerClient) RestoreFromBackup(ctx context.Context, tablet *topodatapb.Tablet) (logutil.EventStream, error) {
|
||||
return &eofEventStream{}, nil
|
||||
}
|
||||
|
|
|
@ -672,12 +672,12 @@ func (client *Client) PromoteSlave(ctx context.Context, tablet *topodatapb.Table
|
|||
//
|
||||
// Backup related methods
|
||||
//
|
||||
type eventStreamAdapter struct {
|
||||
type backupStreamAdapter struct {
|
||||
stream tabletmanagerservicepb.TabletManager_BackupClient
|
||||
cc *grpc.ClientConn
|
||||
}
|
||||
|
||||
func (e *eventStreamAdapter) Recv() (*logutilpb.Event, error) {
|
||||
func (e *backupStreamAdapter) Recv() (*logutilpb.Event, error) {
|
||||
br, err := e.stream.Recv()
|
||||
if err != nil {
|
||||
e.cc.Close()
|
||||
|
@ -700,7 +700,39 @@ func (client *Client) Backup(ctx context.Context, tablet *topodatapb.Tablet, con
|
|||
cc.Close()
|
||||
return nil, err
|
||||
}
|
||||
return &eventStreamAdapter{
|
||||
return &backupStreamAdapter{
|
||||
stream: stream,
|
||||
cc: cc,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type restoreFromBackupStreamAdapter struct {
|
||||
stream tabletmanagerservicepb.TabletManager_RestoreFromBackupClient
|
||||
cc *grpc.ClientConn
|
||||
}
|
||||
|
||||
func (e *restoreFromBackupStreamAdapter) Recv() (*logutilpb.Event, error) {
|
||||
br, err := e.stream.Recv()
|
||||
if err != nil {
|
||||
e.cc.Close()
|
||||
return nil, err
|
||||
}
|
||||
return br.Event, nil
|
||||
}
|
||||
|
||||
// RestoreFromBackup is part of the tmclient.TabletManagerClient interface.
|
||||
func (client *Client) RestoreFromBackup(ctx context.Context, tablet *topodatapb.Tablet) (logutil.EventStream, error) {
|
||||
cc, c, err := client.dial(tablet)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stream, err := c.RestoreFromBackup(ctx, &tabletmanagerdatapb.RestoreFromBackupRequest{})
|
||||
if err != nil {
|
||||
cc.Close()
|
||||
return nil, err
|
||||
}
|
||||
return &restoreFromBackupStreamAdapter{
|
||||
stream: stream,
|
||||
cc: cc,
|
||||
}, nil
|
||||
|
|
|
@ -456,6 +456,23 @@ func (s *server) Backup(request *tabletmanagerdatapb.BackupRequest, stream table
|
|||
})
|
||||
}
|
||||
|
||||
func (s *server) RestoreFromBackup(request *tabletmanagerdatapb.RestoreFromBackupRequest, stream tabletmanagerservicepb.TabletManager_RestoreFromBackupServer) error {
|
||||
ctx := callinfo.GRPCCallInfo(stream.Context())
|
||||
return s.agent.RPCWrapLockAction(ctx, tabletmanager.TabletActionRestoreFromBackup, request, nil, true, func() error {
|
||||
// create a logger, send the result back to the caller
|
||||
logger := logutil.NewCallbackLogger(func(e *logutilpb.Event) {
|
||||
// If the client disconnects, we will just fail
|
||||
// to send the log events, but won't interrupt
|
||||
// the backup.
|
||||
stream.Send(&tabletmanagerdatapb.RestoreFromBackupResponse{
|
||||
Event: e,
|
||||
})
|
||||
})
|
||||
|
||||
return s.agent.RestoreFromBackup(ctx, logger)
|
||||
})
|
||||
}
|
||||
|
||||
// registration glue
|
||||
|
||||
func init() {
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
log "github.com/golang/glog"
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
"github.com/youtube/vitess/go/vt/mysqlctl"
|
||||
"github.com/youtube/vitess/go/vt/mysqlctl/replication"
|
||||
"github.com/youtube/vitess/go/vt/topo/topoproto"
|
||||
|
@ -25,14 +26,17 @@ var (
|
|||
restoreConcurrency = flag.Int("restore_concurrency", 4, "(init restore parameter) how many concurrent files to restore at once")
|
||||
)
|
||||
|
||||
// RestoreFromBackup is the main entry point for backup restore.
|
||||
// RestoreData is the main entry point for backup restore.
|
||||
// It will either work, fail gracefully, or return
|
||||
// an error in case of a non-recoverable error.
|
||||
// It takes the action lock so no RPC interferes.
|
||||
func (agent *ActionAgent) RestoreFromBackup(ctx context.Context) error {
|
||||
func (agent *ActionAgent) RestoreData(ctx context.Context, logger logutil.Logger, deleteBeforeRestore bool) error {
|
||||
agent.actionMutex.Lock()
|
||||
defer agent.actionMutex.Unlock()
|
||||
return agent.restoreDataLocked(ctx, logger, deleteBeforeRestore)
|
||||
}
|
||||
|
||||
func (agent *ActionAgent) restoreDataLocked(ctx context.Context, logger logutil.Logger, deleteBeforeRestore bool) error {
|
||||
// change type to RESTORE (using UpdateTabletFields so it's
|
||||
// always authorized)
|
||||
tablet := agent.Tablet()
|
||||
|
@ -44,11 +48,16 @@ func (agent *ActionAgent) RestoreFromBackup(ctx context.Context) error {
|
|||
return fmt.Errorf("Cannot change type to RESTORE: %v", err)
|
||||
}
|
||||
|
||||
// let's update our internal state (stop query service and other things)
|
||||
if err := agent.refreshTablet(ctx, "restore from backup"); err != nil {
|
||||
return fmt.Errorf("failed to update state before restore: %v", err)
|
||||
}
|
||||
|
||||
// Try to restore. Depending on the reason for failure, we may be ok.
|
||||
// If we're not ok, return an error and the agent will log.Fatalf,
|
||||
// causing the process to be restarted and the restore retried.
|
||||
dir := fmt.Sprintf("%v/%v", tablet.Keyspace, tablet.Shard)
|
||||
pos, err := mysqlctl.Restore(ctx, agent.MysqlDaemon, dir, *restoreConcurrency, agent.hookExtraEnv())
|
||||
pos, err := mysqlctl.Restore(ctx, agent.MysqlDaemon, dir, *restoreConcurrency, agent.hookExtraEnv(), logger, deleteBeforeRestore)
|
||||
switch err {
|
||||
case nil:
|
||||
// Populate local_metadata before starting replication,
|
||||
|
@ -82,6 +91,12 @@ func (agent *ActionAgent) RestoreFromBackup(ctx context.Context) error {
|
|||
}); err != nil {
|
||||
return fmt.Errorf("Cannot change type back to %v: %v", originalType, err)
|
||||
}
|
||||
|
||||
// let's update our internal state (start query service and other things)
|
||||
if err := agent.refreshTablet(ctx, "after restore from backup"); err != nil {
|
||||
return fmt.Errorf("failed to update state after backup: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -160,6 +160,9 @@ const (
|
|||
|
||||
// TabletActionBackup takes a db backup and stores it into BackupStorage
|
||||
TabletActionBackup TabletAction = "Backup"
|
||||
|
||||
// TabletActionRestoreFromBackup wipes the local data and restores anew from backup.
|
||||
TabletActionRestoreFromBackup TabletAction = "RestoreFromBackup"
|
||||
)
|
||||
|
||||
// RPCAgent defines the interface implemented by the Agent for RPCs.
|
||||
|
@ -253,6 +256,8 @@ type RPCAgent interface {
|
|||
|
||||
Backup(ctx context.Context, concurrency int, logger logutil.Logger) error
|
||||
|
||||
RestoreFromBackup(ctx context.Context, logger logutil.Logger) error
|
||||
|
||||
// RPC helpers
|
||||
RPCWrap(ctx context.Context, name TabletAction, args, reply interface{}, f func() error) error
|
||||
RPCWrapLock(ctx context.Context, name TabletAction, args, reply interface{}, verbose bool, f func() error) error
|
||||
|
|
|
@ -67,3 +67,26 @@ func (agent *ActionAgent) Backup(ctx context.Context, concurrency int, logger lo
|
|||
|
||||
return returnErr
|
||||
}
|
||||
|
||||
// RestoreFromBackup deletes all local data and restores anew from the latest backup.
|
||||
// Should be called under RPCWrapLockAction.
|
||||
func (agent *ActionAgent) RestoreFromBackup(ctx context.Context, logger logutil.Logger) error {
|
||||
tablet, err := agent.TopoServer.GetTablet(ctx, agent.TabletAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if tablet.Type == topodatapb.TabletType_MASTER {
|
||||
return fmt.Errorf("type MASTER cannot restore from backup, if you really need to do this, restart vttablet in replica mode")
|
||||
}
|
||||
|
||||
// create the loggers: tee to console and source
|
||||
l := logutil.NewTeeLogger(logutil.NewConsoleLogger(), logger)
|
||||
|
||||
// now we can run restore
|
||||
err = agent.restoreDataLocked(ctx, l, true /* deleteBeforeRestore */)
|
||||
|
||||
// re-run health check to be sure to capture any replication delay
|
||||
agent.runHealthCheckProtected()
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -182,6 +182,9 @@ type TabletManagerClient interface {
|
|||
|
||||
// Backup creates a database backup
|
||||
Backup(ctx context.Context, tablet *topodatapb.Tablet, concurrency int) (logutil.EventStream, error)
|
||||
|
||||
// RestoreFromBackup deletes local data and restores database from backup
|
||||
RestoreFromBackup(ctx context.Context, tablet *topodatapb.Tablet) (logutil.EventStream, error)
|
||||
}
|
||||
|
||||
// TabletManagerClientFactory is the factory method to create
|
||||
|
|
|
@ -85,10 +85,14 @@ func (util *testUtils) checkTabletError(t *testing.T, err interface{}, tabletErr
|
|||
}
|
||||
|
||||
func (util *testUtils) newMysqld(dbconfigs *dbconfigs.DBConfigs) mysqlctl.MysqlDaemon {
|
||||
cnf := mysqlctl.NewMycnf(11111, 6802)
|
||||
// Assigning ServerID to be different from tablet UID to make sure that there are no
|
||||
// assumptions in the code that those IDs are the same.
|
||||
cnf.ServerID = 22222
|
||||
return mysqlctl.NewMysqld(
|
||||
"",
|
||||
"",
|
||||
mysqlctl.NewMycnf(11111, 22222, 6802),
|
||||
cnf,
|
||||
&dbconfigs.Dba,
|
||||
&dbconfigs.App.ConnParams,
|
||||
&dbconfigs.Repl,
|
||||
|
|
|
@ -7,7 +7,9 @@ package vtctl
|
|||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
"github.com/youtube/vitess/go/vt/mysqlctl/backupstorage"
|
||||
"github.com/youtube/vitess/go/vt/topo/topoproto"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
|
@ -25,6 +27,12 @@ func init() {
|
|||
commandRemoveBackup,
|
||||
"<keyspace/shard> <backup name>",
|
||||
"Removes a backup for the BackupStorage."})
|
||||
|
||||
addCommand("Tablets", command{
|
||||
"RestoreFromBackup",
|
||||
commandRestoreFromBackup,
|
||||
"<tablet alias>",
|
||||
"Stops mysqld and restores the data from the latest backup."})
|
||||
}
|
||||
|
||||
func commandListBackups(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
|
||||
|
@ -78,3 +86,36 @@ func commandRemoveBackup(ctx context.Context, wr *wrangler.Wrangler, subFlags *f
|
|||
defer bs.Close()
|
||||
return bs.RemoveBackup(bucket, name)
|
||||
}
|
||||
|
||||
func commandRestoreFromBackup(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
|
||||
if err := subFlags.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
if subFlags.NArg() != 1 {
|
||||
return fmt.Errorf("The RestoreFromBackup command requires the <tablet alias> argument.")
|
||||
}
|
||||
|
||||
tabletAlias, err := topoproto.ParseTabletAlias(subFlags.Arg(0))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tabletInfo, err := wr.TopoServer().GetTablet(ctx, tabletAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stream, err := wr.TabletManagerClient().RestoreFromBackup(ctx, tabletInfo.Tablet)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
e, err := stream.Recv()
|
||||
switch err {
|
||||
case nil:
|
||||
logutil.LogEvent(wr.Logger(), e)
|
||||
case io.EOF:
|
||||
return nil
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -153,8 +153,8 @@ func TestBackupRestore(t *testing.T) {
|
|||
destTablet.StartActionLoop(t, wr)
|
||||
defer destTablet.StopActionLoop(t)
|
||||
|
||||
if err := destTablet.Agent.RestoreFromBackup(ctx); err != nil {
|
||||
t.Fatalf("RestoreFromBackup failed: %v", err)
|
||||
if err := destTablet.Agent.RestoreData(ctx, logutil.NewConsoleLogger(), false /* deleteBeforeRestore */); err != nil {
|
||||
t.Fatalf("RestoreData failed: %v", err)
|
||||
}
|
||||
|
||||
// verify the full status
|
||||
|
|
|
@ -19,9 +19,14 @@ message RunMysqlUpgradeRequest{}
|
|||
|
||||
message RunMysqlUpgradeResponse{}
|
||||
|
||||
message ReinitConfigRequest{}
|
||||
|
||||
message ReinitConfigResponse{}
|
||||
|
||||
// MysqlCtl is the service definition
|
||||
service MysqlCtl {
|
||||
rpc Start(StartRequest) returns (StartResponse) {};
|
||||
rpc Shutdown(ShutdownRequest) returns (ShutdownResponse) {};
|
||||
rpc RunMysqlUpgrade(RunMysqlUpgradeRequest) returns (RunMysqlUpgradeResponse) {};
|
||||
rpc ReinitConfig(ReinitConfigRequest) returns (ReinitConfigResponse) {};
|
||||
}
|
||||
|
|
|
@ -404,3 +404,10 @@ message BackupRequest {
|
|||
message BackupResponse {
|
||||
logutil.Event event = 1;
|
||||
}
|
||||
|
||||
message RestoreFromBackupRequest {
|
||||
}
|
||||
|
||||
message RestoreFromBackupResponse {
|
||||
logutil.Event event = 1;
|
||||
}
|
||||
|
|
|
@ -165,4 +165,7 @@ service TabletManager {
|
|||
//
|
||||
|
||||
rpc Backup(tabletmanagerdata.BackupRequest) returns (stream tabletmanagerdata.BackupResponse) {};
|
||||
|
||||
// RestoreFromBackup deletes all local data and restores it from the latest backup.
|
||||
rpc RestoreFromBackup(tabletmanagerdata.RestoreFromBackupRequest) returns (stream tabletmanagerdata.RestoreFromBackupResponse) {};
|
||||
}
|
||||
|
|
|
@ -242,12 +242,16 @@ class TestBackup(unittest.TestCase):
|
|||
|
||||
tablet_replica2.kill_vttablet()
|
||||
|
||||
def test_restore_old_master(self):
|
||||
def _restore_old_master_test(self, restore_method):
|
||||
"""Test that a former master replicates correctly after being restored.
|
||||
|
||||
- Take a backup.
|
||||
- Reparent from old master to new master.
|
||||
- Delete the old master and force it to restore from a previous backup.
|
||||
- Force old master to restore from a previous backup using restore_method.
|
||||
|
||||
Args:
|
||||
restore_method: function accepting one parameter of type tablet.Tablet,
|
||||
this function is called to force a restore on the provided tablet
|
||||
"""
|
||||
|
||||
# insert data on master, wait for slave to get it
|
||||
|
@ -269,11 +273,23 @@ class TestBackup(unittest.TestCase):
|
|||
self._insert_data(tablet_replica1, 3)
|
||||
|
||||
# force the old master to restore at the latest backup.
|
||||
tablet_master.kill_vttablet()
|
||||
self._restore(tablet_master)
|
||||
restore_method(tablet_master)
|
||||
|
||||
# wait for it to catch up.
|
||||
self._check_data(tablet_master, 3, 'former master catches up after restore')
|
||||
|
||||
def test_restore_old_master(self):
|
||||
def _restore_using_kill(t):
|
||||
t.kill_vttablet()
|
||||
self._restore(t)
|
||||
|
||||
self._restore_old_master_test(_restore_using_kill)
|
||||
|
||||
def test_in_place_restore(self):
|
||||
def _restore_in_place(t):
|
||||
utils.run_vtctl(['RestoreFromBackup', t.tablet_alias], auto_log=True)
|
||||
|
||||
self._restore_old_master_test(_restore_in_place)
|
||||
|
||||
if __name__ == '__main__':
|
||||
utils.main()
|
||||
|
|
Загрузка…
Ссылка в новой задаче