Now using context in mysqlctld communication, and in

mysql Start/Shutdown.
This commit is contained in:
Alain Jobart 2015-06-03 14:21:39 -07:00
Родитель a72346bd85
Коммит 374bb645a6
8 изменённых файлов: 121 добавлений и 52 удалений

Просмотреть файл

@ -17,6 +17,7 @@ import (
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/mysqlctl"
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
"golang.org/x/net/context"
// import mysql to register mysql connection function
_ "github.com/youtube/vitess/go/mysql"
@ -36,7 +37,9 @@ func initCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) err
bootstrapArchive := subFlags.String("bootstrap_archive", "mysql-db-dir.tbz", "name of bootstrap archive within vitess/data/bootstrap directory")
subFlags.Parse(args)
if err := mysqld.Init(*waitTime, *bootstrapArchive); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
defer cancel()
if err := mysqld.Init(ctx, *bootstrapArchive); err != nil {
return fmt.Errorf("failed init mysql: %v", err)
}
return nil
@ -46,7 +49,9 @@ func shutdownCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string)
waitTime := subFlags.Duration("wait_time", mysqlctl.MysqlWaitTime, "how long to wait for shutdown")
subFlags.Parse(args)
if err := mysqld.Shutdown(true, *waitTime); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
defer cancel()
if err := mysqld.Shutdown(ctx, true); err != nil {
return fmt.Errorf("failed shutdown mysql: %v", err)
}
return nil
@ -56,17 +61,22 @@ func startCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) er
waitTime := subFlags.Duration("wait_time", mysqlctl.MysqlWaitTime, "how long to wait for startup")
subFlags.Parse(args)
if err := mysqld.Start(*waitTime); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
defer cancel()
if err := mysqld.Start(ctx); err != nil {
return fmt.Errorf("failed start mysql: %v", err)
}
return nil
}
func teardownCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) error {
waitTime := subFlags.Duration("wait_time", mysqlctl.MysqlWaitTime, "how long to wait for shutdown")
force := subFlags.Bool("force", false, "will remove the root directory even if mysqld shutdown fails")
subFlags.Parse(args)
if err := mysqld.Teardown(*force); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
defer cancel()
if err := mysqld.Teardown(ctx, *force); err != nil {
return fmt.Errorf("failed teardown mysql (forced? %v): %v", *force, err)
}
return nil

Просмотреть файл

@ -17,6 +17,7 @@ import (
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/servenv"
"golang.org/x/net/context"
// import mysql to register mysql connection function
_ "github.com/youtube/vitess/go/mysql"
@ -72,13 +73,15 @@ func main() {
})
// Start or Init mysqld as needed.
ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
if _, err = os.Stat(mycnf.DataDir); os.IsNotExist(err) {
log.Infof("mysql data dir (%s) doesn't exist, initializing", mycnf.DataDir)
mysqld.Init(*waitTime, *bootstrapArchive)
mysqld.Init(ctx, *bootstrapArchive)
} else {
log.Infof("mysql data dir (%s) already exists, starting without init", mycnf.DataDir)
mysqld.Start(*waitTime)
mysqld.Start(ctx)
}
cancel()
servenv.Init()
defer servenv.Close()
@ -86,7 +89,8 @@ func main() {
// Take mysqld down with us on SIGTERM before entering lame duck.
servenv.OnTerm(func() {
log.Infof("mysqlctl received SIGTERM, shutting down mysqld first")
mysqld.Shutdown(false, 0)
ctx := context.Background()
mysqld.Shutdown(ctx, false)
})
// Start RPC server and wait for SIGTERM.

Просмотреть файл

@ -17,6 +17,7 @@ import (
"sync"
log "github.com/golang/glog"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/cgzip"
"github.com/youtube/vitess/go/sync2"
@ -254,7 +255,10 @@ func backup(mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHa
logger.Infof("using replication position: %v", replicationPosition)
// shutdown mysqld
if err = mysqld.Shutdown(true, MysqlWaitTime); err != nil {
ctx, cancel := context.WithTimeout(context.TODO(), MysqlWaitTime)
err = mysqld.Shutdown(ctx, true)
cancel()
if err != nil {
return fmt.Errorf("cannot shutdown mysqld: %v", err)
}
@ -271,7 +275,10 @@ func backup(mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHa
}
// Try to restart mysqld
if err := mysqld.Start(MysqlWaitTime); err != nil {
ctx, cancel = context.WithTimeout(context.TODO(), MysqlWaitTime)
err = mysqld.Start(ctx)
cancel()
if err != nil {
return fmt.Errorf("cannot restart mysqld: %v", err)
}
@ -539,7 +546,10 @@ func Restore(mysqld MysqlDaemon, bucket string, restoreConcurrency int, hookExtr
}
log.Infof("Restore: shutdown mysqld")
if err := mysqld.Shutdown(true, MysqlWaitTime); err != nil {
ctx, cancel := context.WithTimeout(context.TODO(), MysqlWaitTime)
err = mysqld.Shutdown(ctx, true)
cancel()
if err != nil {
return proto.ReplicationPosition{}, err
}
@ -554,7 +564,10 @@ func Restore(mysqld MysqlDaemon, bucket string, restoreConcurrency int, hookExtr
}
log.Infof("Restore: restart mysqld")
if err := mysqld.Start(MysqlWaitTime); err != nil {
ctx, cancel = context.WithTimeout(context.TODO(), MysqlWaitTime)
err = mysqld.Start(ctx)
cancel()
if err != nil {
return proto.ReplicationPosition{}, err
}

Просмотреть файл

@ -4,6 +4,7 @@
// Package gorpcmysqlctlclient contains the go rpc version of the mysqlctl
// client protocol.
// Since gorpc doesn't forward context deadline, we forward them manually.
package gorpcmysqlctlclient
import (
@ -33,21 +34,34 @@ func goRPCMysqlctlClientFactory(network, addr string, dialTimeout time.Duration)
}
// Start is part of the MysqlctlClient interface.
func (c *goRPCMysqlctlClient) Start(mysqlWaitTime time.Duration) error {
return c.rpcClient.Call(context.TODO(), "MysqlctlServer.Start", &mysqlWaitTime, &rpc.Unused{})
func (c *goRPCMysqlctlClient) Start(ctx context.Context) error {
var timeout time.Duration
if deadline, ok := ctx.Deadline(); ok {
timeout = deadline.Sub(time.Now())
if timeout <= 0 {
return fmt.Errorf("deadline exceeded")
}
}
return c.rpcClient.Call(ctx, "MysqlctlServer.Start", &timeout, &rpc.Unused{})
}
// Shutdown is part of the MysqlctlClient interface.
func (c *goRPCMysqlctlClient) Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) error {
if !waitForMysqld {
mysqlWaitTime = 0
func (c *goRPCMysqlctlClient) Shutdown(ctx context.Context, waitForMysqld bool) error {
var timeout time.Duration
if waitForMysqld {
if deadline, ok := ctx.Deadline(); ok {
timeout = deadline.Sub(time.Now())
if timeout <= 0 {
return fmt.Errorf("deadline exceeded")
}
}
}
return c.rpcClient.Call(context.TODO(), "MysqlctlServer.Shutdown", &mysqlWaitTime, &rpc.Unused{})
return c.rpcClient.Call(ctx, "MysqlctlServer.Shutdown", &timeout, &rpc.Unused{})
}
// RunMysqlUpgrade is part of the MysqlctlClient interface.
func (c *goRPCMysqlctlClient) RunMysqlUpgrade() error {
return c.rpcClient.Call(context.TODO(), "MysqlctlServer.RunMysqlUpgrade", &rpc.Unused{}, &rpc.Unused{})
func (c *goRPCMysqlctlClient) RunMysqlUpgrade(ctx context.Context) error {
return c.rpcClient.Call(ctx, "MysqlctlServer.RunMysqlUpgrade", &rpc.Unused{}, &rpc.Unused{})
}
// Close is part of the MysqlctlClient interface.

Просмотреть файл

@ -24,12 +24,26 @@ type MysqlctlServer struct {
// Start implements the server side of the MysqlctlClient interface.
func (s *MysqlctlServer) Start(ctx context.Context, args *time.Duration, reply *rpc.Unused) error {
return s.mysqld.Start(*args)
if *args != 0 {
// if a duration was passed in, add it to the Context.
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, *args)
defer cancel()
}
return s.mysqld.Start(ctx)
}
// Shutdown implements the server side of the MysqlctlClient interface.
func (s *MysqlctlServer) Shutdown(ctx context.Context, args *time.Duration, reply *rpc.Unused) error {
return s.mysqld.Shutdown(*args > 0, *args)
waitForMysqld := false
if *args != 0 {
// if a duration was passed in, add it to the Context.
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, *args)
defer cancel()
waitForMysqld = true
}
return s.mysqld.Shutdown(ctx, waitForMysqld)
}
// RunMysqlUpgrade implements the server side of the MysqlctlClient interface.

Просмотреть файл

@ -24,8 +24,8 @@ type MysqlDaemon interface {
Cnf() *Mycnf
// methods related to mysql running or not
Start(mysqlWaitTime time.Duration) error
Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) error
Start(ctx context.Context) error
Shutdown(ctx context.Context, waitForMysqld bool) error
RunMysqlUpgrade() error
// GetMysqlPort returns the current port mysql is listening on.
@ -197,7 +197,7 @@ func (fmd *FakeMysqlDaemon) Cnf() *Mycnf {
}
// Start is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) Start(mysqlWaitTime time.Duration) error {
func (fmd *FakeMysqlDaemon) Start(ctx context.Context) error {
if fmd.Running {
return fmt.Errorf("fake mysql daemon already running")
}
@ -206,7 +206,7 @@ func (fmd *FakeMysqlDaemon) Start(mysqlWaitTime time.Duration) error {
}
// Shutdown is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) error {
func (fmd *FakeMysqlDaemon) Shutdown(ctx context.Context, waitForMysqld bool) error {
if !fmd.Running {
return fmt.Errorf("fake mysql daemon not running")
}

Просмотреть файл

@ -12,20 +12,22 @@ import (
"time"
log "github.com/golang/glog"
"golang.org/x/net/context"
)
var mysqlctlClientProtocol = flag.String("mysqlctl_client_protocol", "gorpc", "the protocol to use to talk to the mysqlctl server")
var protocol = flag.String("mysqlctl_client_protocol", "gorpc", "the protocol to use to talk to the mysqlctl server")
var connectionTimeout = flag.Duration("mysqlctl_client_connection_timeout", 30*time.Second, "the connection timeout to use to talk to the mysqlctl server")
// MysqlctlClient defines the interface used to send remote mysqlctl commands
type MysqlctlClient interface {
// Start calls Mysqld.Start remotely.
Start(mysqlWaitTime time.Duration) error
Start(ctx context.Context) error
// Shutdown calls Mysqld.Shutdown remotely.
Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) error
Shutdown(ctx context.Context, waitForMysqld bool) error
// RunMysqlUpgrade calls Mysqld.RunMysqlUpgrade remotely.
RunMysqlUpgrade() error
RunMysqlUpgrade(ctx context.Context) error
// Close will terminate the connection. This object won't be used anymore.
Close()
@ -45,10 +47,10 @@ func RegisterFactory(name string, factory Factory) {
}
// New creates a client implementation as specified by a flag.
func New(network, addr string, dialTimeout time.Duration) (MysqlctlClient, error) {
factory, ok := factories[*mysqlctlClientProtocol]
func New(network, addr string) (MysqlctlClient, error) {
factory, ok := factories[*protocol]
if !ok {
return nil, fmt.Errorf("unknown mysqlctl client protocol: %v", *mysqlctlClientProtocol)
return nil, fmt.Errorf("unknown mysqlctl client protocol: %v", *protocol)
}
return factory(network, addr, dialTimeout)
return factory(network, addr, *connectionTimeout)
}

Просмотреть файл

@ -34,6 +34,7 @@ import (
vtenv "github.com/youtube/vitess/go/vt/env"
"github.com/youtube/vitess/go/vt/hook"
"github.com/youtube/vitess/go/vt/mysqlctl/mysqlctlclient"
"golang.org/x/net/context"
)
const (
@ -126,12 +127,12 @@ func (mysqld *Mysqld) RunMysqlUpgrade() error {
// Execute as remote action on mysqlctld if requested.
if *socketFile != "" {
log.Infof("executing Mysqld.RunMysqlUpgrade() remotely via mysqlctld server: %v", *socketFile)
client, err := mysqlctlclient.New("unix", *socketFile, 30*time.Second)
client, err := mysqlctlclient.New("unix", *socketFile)
if err != nil {
return fmt.Errorf("can't dial mysqlctld: %v", err)
}
defer client.Close()
return client.RunMysqlUpgrade()
return client.RunMysqlUpgrade(context.TODO())
}
// find mysql_upgrade. If not there, we do nothing.
@ -175,16 +176,16 @@ func (mysqld *Mysqld) RunMysqlUpgrade() error {
// Start will start the mysql daemon, either by running the 'mysqld_start'
// hook, or by running mysqld_safe in the background.
// If a mysqlctld address is provided in a flag, Start will run remotely.
func (mysqld *Mysqld) Start(mysqlWaitTime time.Duration) error {
func (mysqld *Mysqld) Start(ctx context.Context) error {
// Execute as remote action on mysqlctld if requested.
if *socketFile != "" {
log.Infof("executing Mysqld.Start() remotely via mysqlctld server: %v", *socketFile)
client, err := mysqlctlclient.New("unix", *socketFile, mysqlWaitTime)
client, err := mysqlctlclient.New("unix", *socketFile)
if err != nil {
return fmt.Errorf("can't dial mysqlctld: %v", err)
}
defer client.Close()
return client.Start(mysqlWaitTime)
return client.Start(ctx)
}
var name string
@ -210,7 +211,7 @@ func (mysqld *Mysqld) Start(mysqlWaitTime time.Duration) error {
cmd := exec.Command(name, arg...)
cmd.Dir = dir
cmd.Env = env
log.Infof("%v mysqlWaitTime:%v %#v", ts, mysqlWaitTime, cmd)
log.Infof("%v %#v", ts, cmd)
stderr, err := cmd.StderrPipe()
if err != nil {
return nil
@ -262,7 +263,13 @@ func (mysqld *Mysqld) Start(mysqlWaitTime time.Duration) error {
// give it some time to succeed - usually by the time the socket emerges
// we are in good shape
for i := mysqlWaitTime; i >= 0; i -= time.Second {
for {
select {
case <-ctx.Done():
return errors.New(name + ": deadline exceeded waiting for " + mysqld.config.SocketFile)
default:
}
_, statErr := os.Stat(mysqld.config.SocketFile)
if statErr == nil {
// Make sure the socket file isn't stale.
@ -277,7 +284,6 @@ func (mysqld *Mysqld) Start(mysqlWaitTime time.Duration) error {
log.Infof("%v: sleeping for 1s waiting for socket file %v", ts, mysqld.config.SocketFile)
time.Sleep(time.Second)
}
return errors.New(name + ": deadline exceeded waiting for " + mysqld.config.SocketFile)
}
// Shutdown will stop the mysqld daemon that is running in the background.
@ -287,18 +293,18 @@ func (mysqld *Mysqld) Start(mysqlWaitTime time.Duration) error {
// flushed - on the order of 20-30 minutes.
//
// If a mysqlctld address is provided in a flag, Shutdown will run remotely.
func (mysqld *Mysqld) Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) error {
func (mysqld *Mysqld) Shutdown(ctx context.Context, waitForMysqld bool) error {
log.Infof("Mysqld.Shutdown")
// Execute as remote action on mysqlctld if requested.
if *socketFile != "" {
log.Infof("executing Mysqld.Shutdown() remotely via mysqlctld server: %v", *socketFile)
client, err := mysqlctlclient.New("unix", *socketFile, mysqlWaitTime)
client, err := mysqlctlclient.New("unix", *socketFile)
if err != nil {
return fmt.Errorf("can't dial mysqlctld: %v", err)
}
defer client.Close()
return client.Shutdown(waitForMysqld, mysqlWaitTime)
return client.Shutdown(ctx, waitForMysqld)
}
// We're shutting down on purpose. We no longer want to be notified when
@ -347,10 +353,17 @@ func (mysqld *Mysqld) Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration)
return fmt.Errorf("mysqld_shutdown hook failed: %v", hr.String())
}
// wait for mysqld to really stop. use the sock file as a proxy for that since
// we can't call wait() in a process we didn't start.
// Wait for mysqld to really stop. Use the sock file as a
// proxy for that since we can't call wait() in a process we
// didn't start.
if waitForMysqld {
for i := mysqlWaitTime; i >= 0; i -= time.Second {
for {
select {
case <-ctx.Done():
return errors.New("gave up waiting for mysqld to stop")
default:
}
_, statErr := os.Stat(mysqld.config.SocketFile)
if statErr != nil && os.IsNotExist(statErr) {
return nil
@ -358,7 +371,6 @@ func (mysqld *Mysqld) Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration)
log.Infof("Mysqld.Shutdown: sleeping for 1s waiting for socket file %v", mysqld.config.SocketFile)
time.Sleep(time.Second)
}
return errors.New("gave up waiting for mysqld to stop")
}
return nil
}
@ -382,7 +394,7 @@ func execCmd(name string, args, env []string, dir string) (cmd *exec.Cmd, err er
// Init will create the default directory structure for the mysqld process,
// generate / configure a my.cnf file, unpack a skeleton database,
// and create some management tables.
func (mysqld *Mysqld) Init(mysqlWaitTime time.Duration, bootstrapArchive string) error {
func (mysqld *Mysqld) Init(ctx context.Context, bootstrapArchive string) error {
log.Infof("mysqlctl.Init")
err := mysqld.createDirs()
if err != nil {
@ -411,7 +423,7 @@ func (mysqld *Mysqld) Init(mysqlWaitTime time.Duration, bootstrapArchive string)
}
// Start mysqld.
if err = mysqld.Start(mysqlWaitTime); err != nil {
if err = mysqld.Start(ctx); err != nil {
log.Errorf("failed starting, check %v", mysqld.config.ErrorLogPath)
return err
}
@ -501,9 +513,9 @@ func (mysqld *Mysqld) createTopDir(dir string) error {
}
// Teardown will shutdown the running daemon, and delete the root directory.
func (mysqld *Mysqld) Teardown(force bool) error {
func (mysqld *Mysqld) Teardown(ctx context.Context, force bool) error {
log.Infof("mysqlctl.Teardown")
if err := mysqld.Shutdown(true, MysqlWaitTime); err != nil {
if err := mysqld.Shutdown(ctx, true); err != nil {
log.Warningf("failed mysqld shutdown: %v", err.Error())
if !force {
return err