Merge pull request #749 from youtube/replication

Replication
This commit is contained in:
Alain Jobart 2015-06-04 06:46:44 -07:00
Родитель 6ff9f5b23c ea93cb3343
Коммит 408a039aff
10 изменённых файлов: 221 добавлений и 84 удалений

Просмотреть файл

@ -9,6 +9,7 @@ import (
"flag"
"fmt"
"os"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/exit"
@ -17,6 +18,7 @@ import (
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/mysqlctl"
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
"golang.org/x/net/context"
// import mysql to register mysql connection function
_ "github.com/youtube/vitess/go/mysql"
@ -32,41 +34,50 @@ var (
)
func initCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) error {
waitTime := subFlags.Duration("wait_time", mysqlctl.MysqlWaitTime, "how long to wait for startup")
waitTime := subFlags.Duration("wait_time", 2*time.Minute, "how long to wait for startup")
bootstrapArchive := subFlags.String("bootstrap_archive", "mysql-db-dir.tbz", "name of bootstrap archive within vitess/data/bootstrap directory")
subFlags.Parse(args)
if err := mysqld.Init(*waitTime, *bootstrapArchive); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
defer cancel()
if err := mysqld.Init(ctx, *bootstrapArchive); err != nil {
return fmt.Errorf("failed init mysql: %v", err)
}
return nil
}
func shutdownCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) error {
waitTime := subFlags.Duration("wait_time", mysqlctl.MysqlWaitTime, "how long to wait for shutdown")
waitTime := subFlags.Duration("wait_time", 2*time.Minute, "how long to wait for shutdown")
subFlags.Parse(args)
if err := mysqld.Shutdown(true, *waitTime); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
defer cancel()
if err := mysqld.Shutdown(ctx, true); err != nil {
return fmt.Errorf("failed shutdown mysql: %v", err)
}
return nil
}
func startCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) error {
waitTime := subFlags.Duration("wait_time", mysqlctl.MysqlWaitTime, "how long to wait for startup")
waitTime := subFlags.Duration("wait_time", 2*time.Minute, "how long to wait for startup")
subFlags.Parse(args)
if err := mysqld.Start(*waitTime); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
defer cancel()
if err := mysqld.Start(ctx); err != nil {
return fmt.Errorf("failed start mysql: %v", err)
}
return nil
}
func teardownCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) error {
waitTime := subFlags.Duration("wait_time", 2*time.Minute, "how long to wait for shutdown")
force := subFlags.Bool("force", false, "will remove the root directory even if mysqld shutdown fails")
subFlags.Parse(args)
if err := mysqld.Teardown(*force); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
defer cancel()
if err := mysqld.Teardown(ctx, *force); err != nil {
return fmt.Errorf("failed teardown mysql (forced? %v): %v", *force, err)
}
return nil

Просмотреть файл

@ -10,6 +10,7 @@ package main
import (
"flag"
"os"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/exit"
@ -17,6 +18,7 @@ import (
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/servenv"
"golang.org/x/net/context"
// import mysql to register mysql connection function
_ "github.com/youtube/vitess/go/mysql"
@ -31,7 +33,7 @@ var (
mysqlSocket = flag.String("mysql_socket", "", "path to the mysql socket")
// mysqlctl init flags
waitTime = flag.Duration("wait_time", mysqlctl.MysqlWaitTime, "how long to wait for mysqld startup or shutdown")
waitTime = flag.Duration("wait_time", 2*time.Minute, "how long to wait for mysqld startup or shutdown")
bootstrapArchive = flag.String("bootstrap_archive", "mysql-db-dir.tbz", "name of bootstrap archive within vitess/data/bootstrap directory")
)
@ -72,13 +74,15 @@ func main() {
})
// Start or Init mysqld as needed.
ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
if _, err = os.Stat(mycnf.DataDir); os.IsNotExist(err) {
log.Infof("mysql data dir (%s) doesn't exist, initializing", mycnf.DataDir)
mysqld.Init(*waitTime, *bootstrapArchive)
mysqld.Init(ctx, *bootstrapArchive)
} else {
log.Infof("mysql data dir (%s) already exists, starting without init", mycnf.DataDir)
mysqld.Start(*waitTime)
mysqld.Start(ctx)
}
cancel()
servenv.Init()
defer servenv.Close()
@ -86,7 +90,8 @@ func main() {
// Take mysqld down with us on SIGTERM before entering lame duck.
servenv.OnTerm(func() {
log.Infof("mysqlctl received SIGTERM, shutting down mysqld first")
mysqld.Shutdown(false, 0)
ctx := context.Background()
mysqld.Shutdown(ctx, false)
})
// Start RPC server and wait for SIGTERM.

Просмотреть файл

@ -17,6 +17,7 @@ import (
"sync"
log "github.com/golang/glog"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/cgzip"
"github.com/youtube/vitess/go/sync2"
@ -180,7 +181,7 @@ func findFilesTobackup(cnf *Mycnf) ([]FileEntry, error) {
// - uses the BackupStorage service to store a new backup
// - shuts down Mysqld during the backup
// - remember if we were replicating, restore the exact same state
func Backup(mysqld MysqlDaemon, logger logutil.Logger, bucket, name string, backupConcurrency int, hookExtraEnv map[string]string) error {
func Backup(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, bucket, name string, backupConcurrency int, hookExtraEnv map[string]string) error {
// start the backup with the BackupStorage
bs, err := backupstorage.GetBackupStorage()
@ -192,7 +193,7 @@ func Backup(mysqld MysqlDaemon, logger logutil.Logger, bucket, name string, back
return fmt.Errorf("StartBackup failed: %v", err)
}
if err = backup(mysqld, logger, bh, backupConcurrency, hookExtraEnv); err != nil {
if err = backup(ctx, mysqld, logger, bh, backupConcurrency, hookExtraEnv); err != nil {
if abortErr := bh.AbortBackup(); abortErr != nil {
logger.Errorf("failed to abort backup: %v", abortErr)
}
@ -201,7 +202,7 @@ func Backup(mysqld MysqlDaemon, logger logutil.Logger, bucket, name string, back
return bh.EndBackup()
}
func backup(mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, backupConcurrency int, hookExtraEnv map[string]string) error {
func backup(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, backupConcurrency int, hookExtraEnv map[string]string) error {
// save initial state so we can restore
slaveStartRequired := false
@ -254,7 +255,8 @@ func backup(mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHa
logger.Infof("using replication position: %v", replicationPosition)
// shutdown mysqld
if err = mysqld.Shutdown(true, MysqlWaitTime); err != nil {
err = mysqld.Shutdown(ctx, true)
if err != nil {
return fmt.Errorf("cannot shutdown mysqld: %v", err)
}
@ -271,7 +273,8 @@ func backup(mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHa
}
// Try to restart mysqld
if err := mysqld.Start(MysqlWaitTime); err != nil {
err = mysqld.Start(ctx)
if err != nil {
return fmt.Errorf("cannot restart mysqld: %v", err)
}
@ -497,7 +500,7 @@ func restoreFiles(cnf *Mycnf, bh backupstorage.BackupHandle, fes []FileEntry, re
// Restore is the main entry point for backup restore. If there is no
// appropriate backup on the BackupStorage, Restore logs an error
// and returns ErrNoBackup. Any other error is returned.
func Restore(mysqld MysqlDaemon, bucket string, restoreConcurrency int, hookExtraEnv map[string]string) (proto.ReplicationPosition, error) {
func Restore(ctx context.Context, mysqld MysqlDaemon, bucket string, restoreConcurrency int, hookExtraEnv map[string]string) (proto.ReplicationPosition, error) {
// find the right backup handle: most recent one, with a MANIFEST
log.Infof("Restore: looking for a suitable backup to restore")
bs, err := backupstorage.GetBackupStorage()
@ -539,7 +542,8 @@ func Restore(mysqld MysqlDaemon, bucket string, restoreConcurrency int, hookExtr
}
log.Infof("Restore: shutdown mysqld")
if err := mysqld.Shutdown(true, MysqlWaitTime); err != nil {
err = mysqld.Shutdown(ctx, true)
if err != nil {
return proto.ReplicationPosition{}, err
}
@ -548,8 +552,14 @@ func Restore(mysqld MysqlDaemon, bucket string, restoreConcurrency int, hookExtr
return proto.ReplicationPosition{}, err
}
log.Infof("Restore: running mysql_upgrade if necessary")
if err := mysqld.RunMysqlUpgrade(); err != nil {
return proto.ReplicationPosition{}, err
}
log.Infof("Restore: restart mysqld")
if err := mysqld.Start(MysqlWaitTime); err != nil {
err = mysqld.Start(ctx)
if err != nil {
return proto.ReplicationPosition{}, err
}

Просмотреть файл

@ -4,6 +4,7 @@
// Package gorpcmysqlctlclient contains the go rpc version of the mysqlctl
// client protocol.
// Since gorpc doesn't forward context deadline, we forward them manually.
package gorpcmysqlctlclient
import (
@ -12,43 +13,62 @@ import (
"golang.org/x/net/context"
rpc "github.com/youtube/vitess/go/rpcplus"
"github.com/youtube/vitess/go/rpcplus"
"github.com/youtube/vitess/go/rpcwrap/bsonrpc"
"github.com/youtube/vitess/go/vt/mysqlctl/mysqlctlclient"
"github.com/youtube/vitess/go/vt/rpc"
)
type goRpcMysqlctlClient struct {
rpcClient *rpc.Client
type goRPCMysqlctlClient struct {
rpcClient *rpcplus.Client
}
func goRpcMysqlctlClientFactory(network, addr string, dialTimeout time.Duration) (mysqlctlclient.MysqlctlClient, error) {
func goRPCMysqlctlClientFactory(network, addr string, dialTimeout time.Duration) (mysqlctlclient.MysqlctlClient, error) {
// create the RPC client
rpcClient, err := bsonrpc.DialHTTP(network, addr, dialTimeout, nil)
if err != nil {
return nil, fmt.Errorf("RPC error for %v: %v", addr, err)
}
return &goRpcMysqlctlClient{rpcClient}, nil
return &goRPCMysqlctlClient{rpcClient}, nil
}
// Start is part of the MysqlctlClient interface.
func (c *goRpcMysqlctlClient) Start(mysqlWaitTime time.Duration) error {
return c.rpcClient.Call(context.TODO(), "MysqlctlServer.Start", &mysqlWaitTime, nil)
func (c *goRPCMysqlctlClient) Start(ctx context.Context) error {
var timeout time.Duration
if deadline, ok := ctx.Deadline(); ok {
timeout = deadline.Sub(time.Now())
if timeout <= 0 {
return fmt.Errorf("deadline exceeded")
}
}
return c.rpcClient.Call(ctx, "MysqlctlServer.Start", &timeout, &rpc.Unused{})
}
// Shutdown is part of the MysqlctlClient interface.
func (c *goRpcMysqlctlClient) Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) error {
if !waitForMysqld {
mysqlWaitTime = 0
func (c *goRPCMysqlctlClient) Shutdown(ctx context.Context, waitForMysqld bool) error {
var timeout time.Duration
if waitForMysqld {
if deadline, ok := ctx.Deadline(); ok {
timeout = deadline.Sub(time.Now())
if timeout <= 0 {
return fmt.Errorf("deadline exceeded")
}
}
}
return c.rpcClient.Call(context.TODO(), "MysqlctlServer.Shutdown", &mysqlWaitTime, nil)
return c.rpcClient.Call(ctx, "MysqlctlServer.Shutdown", &timeout, &rpc.Unused{})
}
// RunMysqlUpgrade is part of the MysqlctlClient interface.
func (c *goRPCMysqlctlClient) RunMysqlUpgrade(ctx context.Context) error {
return c.rpcClient.Call(ctx, "MysqlctlServer.RunMysqlUpgrade", &rpc.Unused{}, &rpc.Unused{})
}
// Close is part of the MysqlctlClient interface.
func (client *goRpcMysqlctlClient) Close() {
client.rpcClient.Close()
func (c *goRPCMysqlctlClient) Close() {
c.rpcClient.Close()
}
func init() {
mysqlctlclient.RegisterMysqlctlClientFactory("gorpc", goRpcMysqlctlClientFactory)
mysqlctlclient.RegisterFactory("gorpc", goRPCMysqlctlClientFactory)
}

Просмотреть файл

@ -12,6 +12,7 @@ import (
"time"
"github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/rpc"
"github.com/youtube/vitess/go/vt/servenv"
"golang.org/x/net/context"
)
@ -22,13 +23,32 @@ type MysqlctlServer struct {
}
// Start implements the server side of the MysqlctlClient interface.
func (s *MysqlctlServer) Start(ctx context.Context, args *time.Duration, reply *int) error {
return s.mysqld.Start(*args)
func (s *MysqlctlServer) Start(ctx context.Context, args *time.Duration, reply *rpc.Unused) error {
if *args != 0 {
// if a duration was passed in, add it to the Context.
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, *args)
defer cancel()
}
return s.mysqld.Start(ctx)
}
// Shutdown implements the server side of the MysqlctlClient interface.
func (s *MysqlctlServer) Shutdown(ctx context.Context, args *time.Duration, reply *int) error {
return s.mysqld.Shutdown(*args > 0, *args)
func (s *MysqlctlServer) Shutdown(ctx context.Context, args *time.Duration, reply *rpc.Unused) error {
waitForMysqld := false
if *args != 0 {
// if a duration was passed in, add it to the Context.
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, *args)
defer cancel()
waitForMysqld = true
}
return s.mysqld.Shutdown(ctx, waitForMysqld)
}
// RunMysqlUpgrade implements the server side of the MysqlctlClient interface.
func (s *MysqlctlServer) RunMysqlUpgrade(ctx context.Context, args *rpc.Unused, reply *rpc.Unused) error {
return s.mysqld.RunMysqlUpgrade()
}
// StartServer registers the Server for RPCs.

Просмотреть файл

@ -24,8 +24,9 @@ type MysqlDaemon interface {
Cnf() *Mycnf
// methods related to mysql running or not
Start(mysqlWaitTime time.Duration) error
Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) error
Start(ctx context.Context) error
Shutdown(ctx context.Context, waitForMysqld bool) error
RunMysqlUpgrade() error
// GetMysqlPort returns the current port mysql is listening on.
GetMysqlPort() (int, error)
@ -196,7 +197,7 @@ func (fmd *FakeMysqlDaemon) Cnf() *Mycnf {
}
// Start is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) Start(mysqlWaitTime time.Duration) error {
func (fmd *FakeMysqlDaemon) Start(ctx context.Context) error {
if fmd.Running {
return fmt.Errorf("fake mysql daemon already running")
}
@ -205,7 +206,7 @@ func (fmd *FakeMysqlDaemon) Start(mysqlWaitTime time.Duration) error {
}
// Shutdown is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) error {
func (fmd *FakeMysqlDaemon) Shutdown(ctx context.Context, waitForMysqld bool) error {
if !fmd.Running {
return fmt.Errorf("fake mysql daemon not running")
}
@ -213,6 +214,11 @@ func (fmd *FakeMysqlDaemon) Shutdown(waitForMysqld bool, mysqlWaitTime time.Dura
return nil
}
// RunMysqlUpgrade is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) RunMysqlUpgrade() error {
return nil
}
// GetMysqlPort is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetMysqlPort() (int, error) {
if fmd.MysqlPort == -1 {

Просмотреть файл

@ -12,39 +12,45 @@ import (
"time"
log "github.com/golang/glog"
"golang.org/x/net/context"
)
var mysqlctlClientProtocol = flag.String("mysqlctl_client_protocol", "gorpc", "the protocol to use to talk to the mysqlctl server")
var protocol = flag.String("mysqlctl_client_protocol", "gorpc", "the protocol to use to talk to the mysqlctl server")
var connectionTimeout = flag.Duration("mysqlctl_client_connection_timeout", 30*time.Second, "the connection timeout to use to talk to the mysqlctl server")
// MysqlctlClient defines the interface used to send remote mysqlctl commands
type MysqlctlClient interface {
// Start calls Mysqld.Start remotely.
Start(mysqlWaitTime time.Duration) error
Start(ctx context.Context) error
// Shutdown calls Mysqld.Shutdown remotely.
Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) error
Shutdown(ctx context.Context, waitForMysqld bool) error
// RunMysqlUpgrade calls Mysqld.RunMysqlUpgrade remotely.
RunMysqlUpgrade(ctx context.Context) error
// Close will terminate the connection. This object won't be used anymore.
Close()
}
// MysqlctlClientFactory functions are registered by client implementations.
type MysqlctlClientFactory func(network, addr string, dialTimeout time.Duration) (MysqlctlClient, error)
// Factory functions are registered by client implementations.
type Factory func(network, addr string, dialTimeout time.Duration) (MysqlctlClient, error)
var mysqlctlClientFactories = make(map[string]MysqlctlClientFactory)
var factories = make(map[string]Factory)
// RegisterMysqlctlClientFactory allows a client implementation to register itself
func RegisterMysqlctlClientFactory(name string, factory MysqlctlClientFactory) {
if _, ok := mysqlctlClientFactories[name]; ok {
log.Fatalf("RegisterMysqlctlClientFactory %s already exists", name)
// RegisterFactory allows a client implementation to register itself
func RegisterFactory(name string, factory Factory) {
if _, ok := factories[name]; ok {
log.Fatalf("RegisterFactory %s already exists", name)
}
mysqlctlClientFactories[name] = factory
factories[name] = factory
}
// New creates a client implementation as specified by a flag.
func New(network, addr string, dialTimeout time.Duration) (MysqlctlClient, error) {
factory, ok := mysqlctlClientFactories[*mysqlctlClientProtocol]
func New(network, addr string) (MysqlctlClient, error) {
factory, ok := factories[*protocol]
if !ok {
return nil, fmt.Errorf("unknown mysqlctl client protocol: %v", *mysqlctlClientProtocol)
return nil, fmt.Errorf("unknown mysqlctl client protocol: %v", *protocol)
}
return factory(network, addr, dialTimeout)
return factory(network, addr, *connectionTimeout)
}

Просмотреть файл

@ -34,11 +34,7 @@ import (
vtenv "github.com/youtube/vitess/go/vt/env"
"github.com/youtube/vitess/go/vt/hook"
"github.com/youtube/vitess/go/vt/mysqlctl/mysqlctlclient"
)
const (
// MysqlWaitTime is the default number of seconds to wait for mysql
MysqlWaitTime = 120 * time.Second
"golang.org/x/net/context"
)
var (
@ -120,19 +116,71 @@ func (mysqld *Mysqld) Cnf() *Mycnf {
return mysqld.config
}
// Start will start the mysql daemon, either by running the 'mysqld_start'
// hook, or by running mysqld_safe in the background.
// If a mysqlctld address is provided in a flag, Start will run remotely.
func (mysqld *Mysqld) Start(mysqlWaitTime time.Duration) error {
// RunMysqlUpgrade will run the mysql_upgrade program on the current install.
// Will not be called when mysqld is running.
func (mysqld *Mysqld) RunMysqlUpgrade() error {
// Execute as remote action on mysqlctld if requested.
if *socketFile != "" {
log.Infof("executing Mysqld.Start() remotely via mysqlctld server: %v", *socketFile)
client, err := mysqlctlclient.New("unix", *socketFile, mysqlWaitTime)
log.Infof("executing Mysqld.RunMysqlUpgrade() remotely via mysqlctld server: %v", *socketFile)
client, err := mysqlctlclient.New("unix", *socketFile)
if err != nil {
return fmt.Errorf("can't dial mysqlctld: %v", err)
}
defer client.Close()
return client.Start(mysqlWaitTime)
return client.RunMysqlUpgrade(context.TODO())
}
// find mysql_upgrade. If not there, we do nothing.
dir, err := vtenv.VtMysqlRoot()
if err != nil {
log.Warningf("VT_MYSQL_ROOT not set, skipping mysql_upgrade step: %v", err)
return nil
}
name := path.Join(dir, "bin/mysql_upgrade")
if _, err := os.Stat(name); err != nil {
log.Warningf("mysql_upgrade binary not present, skipping it: %v", err)
return nil
}
// run the program, if it fails, we fail
cmd := exec.Command(name, "--defaults-file="+mysqld.config.path)
cmd.Env = []string{os.ExpandEnv("LD_LIBRARY_PATH=$VT_MYSQL_ROOT/lib/mysql")}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil
}
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
log.Infof("mysql_upgrade stderr: %v", scanner.Text())
}
}()
go func() {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
log.Infof("mysql_upgrade stdout: %v", scanner.Text())
}
}()
return cmd.Run()
}
// Start will start the mysql daemon, either by running the 'mysqld_start'
// hook, or by running mysqld_safe in the background.
// If a mysqlctld address is provided in a flag, Start will run remotely.
func (mysqld *Mysqld) Start(ctx context.Context) error {
// Execute as remote action on mysqlctld if requested.
if *socketFile != "" {
log.Infof("executing Mysqld.Start() remotely via mysqlctld server: %v", *socketFile)
client, err := mysqlctlclient.New("unix", *socketFile)
if err != nil {
return fmt.Errorf("can't dial mysqlctld: %v", err)
}
defer client.Close()
return client.Start(ctx)
}
var name string
@ -158,7 +206,7 @@ func (mysqld *Mysqld) Start(mysqlWaitTime time.Duration) error {
cmd := exec.Command(name, arg...)
cmd.Dir = dir
cmd.Env = env
log.Infof("%v mysqlWaitTime:%v %#v", ts, mysqlWaitTime, cmd)
log.Infof("%v %#v", ts, cmd)
stderr, err := cmd.StderrPipe()
if err != nil {
return nil
@ -210,7 +258,13 @@ func (mysqld *Mysqld) Start(mysqlWaitTime time.Duration) error {
// give it some time to succeed - usually by the time the socket emerges
// we are in good shape
for i := mysqlWaitTime; i >= 0; i -= time.Second {
for {
select {
case <-ctx.Done():
return errors.New(name + ": deadline exceeded waiting for " + mysqld.config.SocketFile)
default:
}
_, statErr := os.Stat(mysqld.config.SocketFile)
if statErr == nil {
// Make sure the socket file isn't stale.
@ -225,7 +279,6 @@ func (mysqld *Mysqld) Start(mysqlWaitTime time.Duration) error {
log.Infof("%v: sleeping for 1s waiting for socket file %v", ts, mysqld.config.SocketFile)
time.Sleep(time.Second)
}
return errors.New(name + ": deadline exceeded waiting for " + mysqld.config.SocketFile)
}
// Shutdown will stop the mysqld daemon that is running in the background.
@ -235,18 +288,18 @@ func (mysqld *Mysqld) Start(mysqlWaitTime time.Duration) error {
// flushed - on the order of 20-30 minutes.
//
// If a mysqlctld address is provided in a flag, Shutdown will run remotely.
func (mysqld *Mysqld) Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) error {
func (mysqld *Mysqld) Shutdown(ctx context.Context, waitForMysqld bool) error {
log.Infof("Mysqld.Shutdown")
// Execute as remote action on mysqlctld if requested.
if *socketFile != "" {
log.Infof("executing Mysqld.Shutdown() remotely via mysqlctld server: %v", *socketFile)
client, err := mysqlctlclient.New("unix", *socketFile, mysqlWaitTime)
client, err := mysqlctlclient.New("unix", *socketFile)
if err != nil {
return fmt.Errorf("can't dial mysqlctld: %v", err)
}
defer client.Close()
return client.Shutdown(waitForMysqld, mysqlWaitTime)
return client.Shutdown(ctx, waitForMysqld)
}
// We're shutting down on purpose. We no longer want to be notified when
@ -295,10 +348,17 @@ func (mysqld *Mysqld) Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration)
return fmt.Errorf("mysqld_shutdown hook failed: %v", hr.String())
}
// wait for mysqld to really stop. use the sock file as a proxy for that since
// we can't call wait() in a process we didn't start.
// Wait for mysqld to really stop. Use the sock file as a
// proxy for that since we can't call wait() in a process we
// didn't start.
if waitForMysqld {
for i := mysqlWaitTime; i >= 0; i -= time.Second {
for {
select {
case <-ctx.Done():
return errors.New("gave up waiting for mysqld to stop")
default:
}
_, statErr := os.Stat(mysqld.config.SocketFile)
if statErr != nil && os.IsNotExist(statErr) {
return nil
@ -306,7 +366,6 @@ func (mysqld *Mysqld) Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration)
log.Infof("Mysqld.Shutdown: sleeping for 1s waiting for socket file %v", mysqld.config.SocketFile)
time.Sleep(time.Second)
}
return errors.New("gave up waiting for mysqld to stop")
}
return nil
}
@ -330,7 +389,7 @@ func execCmd(name string, args, env []string, dir string) (cmd *exec.Cmd, err er
// Init will create the default directory structure for the mysqld process,
// generate / configure a my.cnf file, unpack a skeleton database,
// and create some management tables.
func (mysqld *Mysqld) Init(mysqlWaitTime time.Duration, bootstrapArchive string) error {
func (mysqld *Mysqld) Init(ctx context.Context, bootstrapArchive string) error {
log.Infof("mysqlctl.Init")
err := mysqld.createDirs()
if err != nil {
@ -359,7 +418,7 @@ func (mysqld *Mysqld) Init(mysqlWaitTime time.Duration, bootstrapArchive string)
}
// Start mysqld.
if err = mysqld.Start(mysqlWaitTime); err != nil {
if err = mysqld.Start(ctx); err != nil {
log.Errorf("failed starting, check %v", mysqld.config.ErrorLogPath)
return err
}
@ -449,9 +508,9 @@ func (mysqld *Mysqld) createTopDir(dir string) error {
}
// Teardown will shutdown the running daemon, and delete the root directory.
func (mysqld *Mysqld) Teardown(force bool) error {
func (mysqld *Mysqld) Teardown(ctx context.Context, force bool) error {
log.Infof("mysqlctl.Teardown")
if err := mysqld.Shutdown(true, MysqlWaitTime); err != nil {
if err := mysqld.Shutdown(ctx, true); err != nil {
log.Warningf("failed mysqld shutdown: %v", err.Error())
if !force {
return err

Просмотреть файл

@ -728,7 +728,7 @@ func (agent *ActionAgent) Backup(ctx context.Context, concurrency int, logger lo
// now we can run the backup
bucket := fmt.Sprintf("%v/%v", tablet.Keyspace, tablet.Shard)
name := fmt.Sprintf("%v.%v", tablet.Alias, time.Now().UTC().Format("2006-01-02.150405"))
returnErr := mysqlctl.Backup(agent.MysqlDaemon, l, bucket, name, concurrency, agent.hookExtraEnv())
returnErr := mysqlctl.Backup(ctx, agent.MysqlDaemon, l, bucket, name, concurrency, agent.hookExtraEnv())
// and change our type back to the appropriate value:
// - if healthcheck is enabled, go to spare

Просмотреть файл

@ -44,7 +44,7 @@ func (agent *ActionAgent) RestoreFromBackup(ctx context.Context) error {
// do the optional restore, if that fails we are in a bad state,
// just log.Fatalf out.
bucket := fmt.Sprintf("%v/%v", tablet.Keyspace, tablet.Shard)
pos, err := mysqlctl.Restore(agent.MysqlDaemon, bucket, *restoreConcurrency, agent.hookExtraEnv())
pos, err := mysqlctl.Restore(ctx, agent.MysqlDaemon, bucket, *restoreConcurrency, agent.hookExtraEnv())
if err != nil && err != mysqlctl.ErrNoBackup {
return fmt.Errorf("Cannot restore original backup: %v", err)
}