From 374bb645a6f3b9274c1fbccc82a0311f8fff1f5e Mon Sep 17 00:00:00 2001 From: Alain Jobart Date: Wed, 3 Jun 2015 14:21:39 -0700 Subject: [PATCH] Now using context in mysqlctld communication, and in mysql Start/Shutdown. --- go/cmd/mysqlctl/mysqlctl.go | 18 +++++-- go/cmd/mysqlctld/mysqlctld.go | 10 ++-- go/vt/mysqlctl/backup.go | 21 ++++++-- go/vt/mysqlctl/gorpcmysqlctlclient/client.go | 30 ++++++++---- go/vt/mysqlctl/gorpcmysqlctlserver/server.go | 18 ++++++- go/vt/mysqlctl/mysql_daemon.go | 8 ++-- go/vt/mysqlctl/mysqlctlclient/interface.go | 18 +++---- go/vt/mysqlctl/mysqld.go | 50 ++++++++++++-------- 8 files changed, 121 insertions(+), 52 deletions(-) diff --git a/go/cmd/mysqlctl/mysqlctl.go b/go/cmd/mysqlctl/mysqlctl.go index 241ca6bd33..005be6c58e 100644 --- a/go/cmd/mysqlctl/mysqlctl.go +++ b/go/cmd/mysqlctl/mysqlctl.go @@ -17,6 +17,7 @@ import ( "github.com/youtube/vitess/go/vt/logutil" "github.com/youtube/vitess/go/vt/mysqlctl" myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto" + "golang.org/x/net/context" // import mysql to register mysql connection function _ "github.com/youtube/vitess/go/mysql" @@ -36,7 +37,9 @@ func initCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) err bootstrapArchive := subFlags.String("bootstrap_archive", "mysql-db-dir.tbz", "name of bootstrap archive within vitess/data/bootstrap directory") subFlags.Parse(args) - if err := mysqld.Init(*waitTime, *bootstrapArchive); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), *waitTime) + defer cancel() + if err := mysqld.Init(ctx, *bootstrapArchive); err != nil { return fmt.Errorf("failed init mysql: %v", err) } return nil @@ -46,7 +49,9 @@ func shutdownCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) waitTime := subFlags.Duration("wait_time", mysqlctl.MysqlWaitTime, "how long to wait for shutdown") subFlags.Parse(args) - if err := mysqld.Shutdown(true, *waitTime); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), *waitTime) + defer cancel() + if err := mysqld.Shutdown(ctx, true); err != nil { return fmt.Errorf("failed shutdown mysql: %v", err) } return nil @@ -56,17 +61,22 @@ func startCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) er waitTime := subFlags.Duration("wait_time", mysqlctl.MysqlWaitTime, "how long to wait for startup") subFlags.Parse(args) - if err := mysqld.Start(*waitTime); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), *waitTime) + defer cancel() + if err := mysqld.Start(ctx); err != nil { return fmt.Errorf("failed start mysql: %v", err) } return nil } func teardownCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) error { + waitTime := subFlags.Duration("wait_time", mysqlctl.MysqlWaitTime, "how long to wait for shutdown") force := subFlags.Bool("force", false, "will remove the root directory even if mysqld shutdown fails") subFlags.Parse(args) - if err := mysqld.Teardown(*force); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), *waitTime) + defer cancel() + if err := mysqld.Teardown(ctx, *force); err != nil { return fmt.Errorf("failed teardown mysql (forced? %v): %v", *force, err) } return nil diff --git a/go/cmd/mysqlctld/mysqlctld.go b/go/cmd/mysqlctld/mysqlctld.go index 4d4128ce03..e4647eff65 100644 --- a/go/cmd/mysqlctld/mysqlctld.go +++ b/go/cmd/mysqlctld/mysqlctld.go @@ -17,6 +17,7 @@ import ( "github.com/youtube/vitess/go/vt/logutil" "github.com/youtube/vitess/go/vt/mysqlctl" "github.com/youtube/vitess/go/vt/servenv" + "golang.org/x/net/context" // import mysql to register mysql connection function _ "github.com/youtube/vitess/go/mysql" @@ -72,13 +73,15 @@ func main() { }) // Start or Init mysqld as needed. + ctx, cancel := context.WithTimeout(context.Background(), *waitTime) if _, err = os.Stat(mycnf.DataDir); os.IsNotExist(err) { log.Infof("mysql data dir (%s) doesn't exist, initializing", mycnf.DataDir) - mysqld.Init(*waitTime, *bootstrapArchive) + mysqld.Init(ctx, *bootstrapArchive) } else { log.Infof("mysql data dir (%s) already exists, starting without init", mycnf.DataDir) - mysqld.Start(*waitTime) + mysqld.Start(ctx) } + cancel() servenv.Init() defer servenv.Close() @@ -86,7 +89,8 @@ func main() { // Take mysqld down with us on SIGTERM before entering lame duck. servenv.OnTerm(func() { log.Infof("mysqlctl received SIGTERM, shutting down mysqld first") - mysqld.Shutdown(false, 0) + ctx := context.Background() + mysqld.Shutdown(ctx, false) }) // Start RPC server and wait for SIGTERM. diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go index 0ce3d0b665..229bc8525b 100644 --- a/go/vt/mysqlctl/backup.go +++ b/go/vt/mysqlctl/backup.go @@ -17,6 +17,7 @@ import ( "sync" log "github.com/golang/glog" + "golang.org/x/net/context" "github.com/youtube/vitess/go/cgzip" "github.com/youtube/vitess/go/sync2" @@ -254,7 +255,10 @@ func backup(mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHa logger.Infof("using replication position: %v", replicationPosition) // shutdown mysqld - if err = mysqld.Shutdown(true, MysqlWaitTime); err != nil { + ctx, cancel := context.WithTimeout(context.TODO(), MysqlWaitTime) + err = mysqld.Shutdown(ctx, true) + cancel() + if err != nil { return fmt.Errorf("cannot shutdown mysqld: %v", err) } @@ -271,7 +275,10 @@ func backup(mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHa } // Try to restart mysqld - if err := mysqld.Start(MysqlWaitTime); err != nil { + ctx, cancel = context.WithTimeout(context.TODO(), MysqlWaitTime) + err = mysqld.Start(ctx) + cancel() + if err != nil { return fmt.Errorf("cannot restart mysqld: %v", err) } @@ -539,7 +546,10 @@ func Restore(mysqld MysqlDaemon, bucket string, restoreConcurrency int, hookExtr } log.Infof("Restore: shutdown mysqld") - if err := mysqld.Shutdown(true, MysqlWaitTime); err != nil { + ctx, cancel := context.WithTimeout(context.TODO(), MysqlWaitTime) + err = mysqld.Shutdown(ctx, true) + cancel() + if err != nil { return proto.ReplicationPosition{}, err } @@ -554,7 +564,10 @@ func Restore(mysqld MysqlDaemon, bucket string, restoreConcurrency int, hookExtr } log.Infof("Restore: restart mysqld") - if err := mysqld.Start(MysqlWaitTime); err != nil { + ctx, cancel = context.WithTimeout(context.TODO(), MysqlWaitTime) + err = mysqld.Start(ctx) + cancel() + if err != nil { return proto.ReplicationPosition{}, err } diff --git a/go/vt/mysqlctl/gorpcmysqlctlclient/client.go b/go/vt/mysqlctl/gorpcmysqlctlclient/client.go index 8cbdbfb0a8..db00356ad8 100644 --- a/go/vt/mysqlctl/gorpcmysqlctlclient/client.go +++ b/go/vt/mysqlctl/gorpcmysqlctlclient/client.go @@ -4,6 +4,7 @@ // Package gorpcmysqlctlclient contains the go rpc version of the mysqlctl // client protocol. +// Since gorpc doesn't forward context deadline, we forward them manually. package gorpcmysqlctlclient import ( @@ -33,21 +34,34 @@ func goRPCMysqlctlClientFactory(network, addr string, dialTimeout time.Duration) } // Start is part of the MysqlctlClient interface. -func (c *goRPCMysqlctlClient) Start(mysqlWaitTime time.Duration) error { - return c.rpcClient.Call(context.TODO(), "MysqlctlServer.Start", &mysqlWaitTime, &rpc.Unused{}) +func (c *goRPCMysqlctlClient) Start(ctx context.Context) error { + var timeout time.Duration + if deadline, ok := ctx.Deadline(); ok { + timeout = deadline.Sub(time.Now()) + if timeout <= 0 { + return fmt.Errorf("deadline exceeded") + } + } + return c.rpcClient.Call(ctx, "MysqlctlServer.Start", &timeout, &rpc.Unused{}) } // Shutdown is part of the MysqlctlClient interface. -func (c *goRPCMysqlctlClient) Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) error { - if !waitForMysqld { - mysqlWaitTime = 0 +func (c *goRPCMysqlctlClient) Shutdown(ctx context.Context, waitForMysqld bool) error { + var timeout time.Duration + if waitForMysqld { + if deadline, ok := ctx.Deadline(); ok { + timeout = deadline.Sub(time.Now()) + if timeout <= 0 { + return fmt.Errorf("deadline exceeded") + } + } } - return c.rpcClient.Call(context.TODO(), "MysqlctlServer.Shutdown", &mysqlWaitTime, &rpc.Unused{}) + return c.rpcClient.Call(ctx, "MysqlctlServer.Shutdown", &timeout, &rpc.Unused{}) } // RunMysqlUpgrade is part of the MysqlctlClient interface. -func (c *goRPCMysqlctlClient) RunMysqlUpgrade() error { - return c.rpcClient.Call(context.TODO(), "MysqlctlServer.RunMysqlUpgrade", &rpc.Unused{}, &rpc.Unused{}) +func (c *goRPCMysqlctlClient) RunMysqlUpgrade(ctx context.Context) error { + return c.rpcClient.Call(ctx, "MysqlctlServer.RunMysqlUpgrade", &rpc.Unused{}, &rpc.Unused{}) } // Close is part of the MysqlctlClient interface. diff --git a/go/vt/mysqlctl/gorpcmysqlctlserver/server.go b/go/vt/mysqlctl/gorpcmysqlctlserver/server.go index d0fe0fda97..4e99e6157b 100644 --- a/go/vt/mysqlctl/gorpcmysqlctlserver/server.go +++ b/go/vt/mysqlctl/gorpcmysqlctlserver/server.go @@ -24,12 +24,26 @@ type MysqlctlServer struct { // Start implements the server side of the MysqlctlClient interface. func (s *MysqlctlServer) Start(ctx context.Context, args *time.Duration, reply *rpc.Unused) error { - return s.mysqld.Start(*args) + if *args != 0 { + // if a duration was passed in, add it to the Context. + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, *args) + defer cancel() + } + return s.mysqld.Start(ctx) } // Shutdown implements the server side of the MysqlctlClient interface. func (s *MysqlctlServer) Shutdown(ctx context.Context, args *time.Duration, reply *rpc.Unused) error { - return s.mysqld.Shutdown(*args > 0, *args) + waitForMysqld := false + if *args != 0 { + // if a duration was passed in, add it to the Context. + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, *args) + defer cancel() + waitForMysqld = true + } + return s.mysqld.Shutdown(ctx, waitForMysqld) } // RunMysqlUpgrade implements the server side of the MysqlctlClient interface. diff --git a/go/vt/mysqlctl/mysql_daemon.go b/go/vt/mysqlctl/mysql_daemon.go index 383e9b6a69..0ffda56481 100644 --- a/go/vt/mysqlctl/mysql_daemon.go +++ b/go/vt/mysqlctl/mysql_daemon.go @@ -24,8 +24,8 @@ type MysqlDaemon interface { Cnf() *Mycnf // methods related to mysql running or not - Start(mysqlWaitTime time.Duration) error - Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) error + Start(ctx context.Context) error + Shutdown(ctx context.Context, waitForMysqld bool) error RunMysqlUpgrade() error // GetMysqlPort returns the current port mysql is listening on. @@ -197,7 +197,7 @@ func (fmd *FakeMysqlDaemon) Cnf() *Mycnf { } // Start is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) Start(mysqlWaitTime time.Duration) error { +func (fmd *FakeMysqlDaemon) Start(ctx context.Context) error { if fmd.Running { return fmt.Errorf("fake mysql daemon already running") } @@ -206,7 +206,7 @@ func (fmd *FakeMysqlDaemon) Start(mysqlWaitTime time.Duration) error { } // Shutdown is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) error { +func (fmd *FakeMysqlDaemon) Shutdown(ctx context.Context, waitForMysqld bool) error { if !fmd.Running { return fmt.Errorf("fake mysql daemon not running") } diff --git a/go/vt/mysqlctl/mysqlctlclient/interface.go b/go/vt/mysqlctl/mysqlctlclient/interface.go index bee57a82ce..0e5e632300 100644 --- a/go/vt/mysqlctl/mysqlctlclient/interface.go +++ b/go/vt/mysqlctl/mysqlctlclient/interface.go @@ -12,20 +12,22 @@ import ( "time" log "github.com/golang/glog" + "golang.org/x/net/context" ) -var mysqlctlClientProtocol = flag.String("mysqlctl_client_protocol", "gorpc", "the protocol to use to talk to the mysqlctl server") +var protocol = flag.String("mysqlctl_client_protocol", "gorpc", "the protocol to use to talk to the mysqlctl server") +var connectionTimeout = flag.Duration("mysqlctl_client_connection_timeout", 30*time.Second, "the connection timeout to use to talk to the mysqlctl server") // MysqlctlClient defines the interface used to send remote mysqlctl commands type MysqlctlClient interface { // Start calls Mysqld.Start remotely. - Start(mysqlWaitTime time.Duration) error + Start(ctx context.Context) error // Shutdown calls Mysqld.Shutdown remotely. - Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) error + Shutdown(ctx context.Context, waitForMysqld bool) error // RunMysqlUpgrade calls Mysqld.RunMysqlUpgrade remotely. - RunMysqlUpgrade() error + RunMysqlUpgrade(ctx context.Context) error // Close will terminate the connection. This object won't be used anymore. Close() @@ -45,10 +47,10 @@ func RegisterFactory(name string, factory Factory) { } // New creates a client implementation as specified by a flag. -func New(network, addr string, dialTimeout time.Duration) (MysqlctlClient, error) { - factory, ok := factories[*mysqlctlClientProtocol] +func New(network, addr string) (MysqlctlClient, error) { + factory, ok := factories[*protocol] if !ok { - return nil, fmt.Errorf("unknown mysqlctl client protocol: %v", *mysqlctlClientProtocol) + return nil, fmt.Errorf("unknown mysqlctl client protocol: %v", *protocol) } - return factory(network, addr, dialTimeout) + return factory(network, addr, *connectionTimeout) } diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index 21deaf94da..91bfb9ada9 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -34,6 +34,7 @@ import ( vtenv "github.com/youtube/vitess/go/vt/env" "github.com/youtube/vitess/go/vt/hook" "github.com/youtube/vitess/go/vt/mysqlctl/mysqlctlclient" + "golang.org/x/net/context" ) const ( @@ -126,12 +127,12 @@ func (mysqld *Mysqld) RunMysqlUpgrade() error { // Execute as remote action on mysqlctld if requested. if *socketFile != "" { log.Infof("executing Mysqld.RunMysqlUpgrade() remotely via mysqlctld server: %v", *socketFile) - client, err := mysqlctlclient.New("unix", *socketFile, 30*time.Second) + client, err := mysqlctlclient.New("unix", *socketFile) if err != nil { return fmt.Errorf("can't dial mysqlctld: %v", err) } defer client.Close() - return client.RunMysqlUpgrade() + return client.RunMysqlUpgrade(context.TODO()) } // find mysql_upgrade. If not there, we do nothing. @@ -175,16 +176,16 @@ func (mysqld *Mysqld) RunMysqlUpgrade() error { // Start will start the mysql daemon, either by running the 'mysqld_start' // hook, or by running mysqld_safe in the background. // If a mysqlctld address is provided in a flag, Start will run remotely. -func (mysqld *Mysqld) Start(mysqlWaitTime time.Duration) error { +func (mysqld *Mysqld) Start(ctx context.Context) error { // Execute as remote action on mysqlctld if requested. if *socketFile != "" { log.Infof("executing Mysqld.Start() remotely via mysqlctld server: %v", *socketFile) - client, err := mysqlctlclient.New("unix", *socketFile, mysqlWaitTime) + client, err := mysqlctlclient.New("unix", *socketFile) if err != nil { return fmt.Errorf("can't dial mysqlctld: %v", err) } defer client.Close() - return client.Start(mysqlWaitTime) + return client.Start(ctx) } var name string @@ -210,7 +211,7 @@ func (mysqld *Mysqld) Start(mysqlWaitTime time.Duration) error { cmd := exec.Command(name, arg...) cmd.Dir = dir cmd.Env = env - log.Infof("%v mysqlWaitTime:%v %#v", ts, mysqlWaitTime, cmd) + log.Infof("%v %#v", ts, cmd) stderr, err := cmd.StderrPipe() if err != nil { return nil @@ -262,7 +263,13 @@ func (mysqld *Mysqld) Start(mysqlWaitTime time.Duration) error { // give it some time to succeed - usually by the time the socket emerges // we are in good shape - for i := mysqlWaitTime; i >= 0; i -= time.Second { + for { + select { + case <-ctx.Done(): + return errors.New(name + ": deadline exceeded waiting for " + mysqld.config.SocketFile) + default: + } + _, statErr := os.Stat(mysqld.config.SocketFile) if statErr == nil { // Make sure the socket file isn't stale. @@ -277,7 +284,6 @@ func (mysqld *Mysqld) Start(mysqlWaitTime time.Duration) error { log.Infof("%v: sleeping for 1s waiting for socket file %v", ts, mysqld.config.SocketFile) time.Sleep(time.Second) } - return errors.New(name + ": deadline exceeded waiting for " + mysqld.config.SocketFile) } // Shutdown will stop the mysqld daemon that is running in the background. @@ -287,18 +293,18 @@ func (mysqld *Mysqld) Start(mysqlWaitTime time.Duration) error { // flushed - on the order of 20-30 minutes. // // If a mysqlctld address is provided in a flag, Shutdown will run remotely. -func (mysqld *Mysqld) Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) error { +func (mysqld *Mysqld) Shutdown(ctx context.Context, waitForMysqld bool) error { log.Infof("Mysqld.Shutdown") // Execute as remote action on mysqlctld if requested. if *socketFile != "" { log.Infof("executing Mysqld.Shutdown() remotely via mysqlctld server: %v", *socketFile) - client, err := mysqlctlclient.New("unix", *socketFile, mysqlWaitTime) + client, err := mysqlctlclient.New("unix", *socketFile) if err != nil { return fmt.Errorf("can't dial mysqlctld: %v", err) } defer client.Close() - return client.Shutdown(waitForMysqld, mysqlWaitTime) + return client.Shutdown(ctx, waitForMysqld) } // We're shutting down on purpose. We no longer want to be notified when @@ -347,10 +353,17 @@ func (mysqld *Mysqld) Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) return fmt.Errorf("mysqld_shutdown hook failed: %v", hr.String()) } - // wait for mysqld to really stop. use the sock file as a proxy for that since - // we can't call wait() in a process we didn't start. + // Wait for mysqld to really stop. Use the sock file as a + // proxy for that since we can't call wait() in a process we + // didn't start. if waitForMysqld { - for i := mysqlWaitTime; i >= 0; i -= time.Second { + for { + select { + case <-ctx.Done(): + return errors.New("gave up waiting for mysqld to stop") + default: + } + _, statErr := os.Stat(mysqld.config.SocketFile) if statErr != nil && os.IsNotExist(statErr) { return nil @@ -358,7 +371,6 @@ func (mysqld *Mysqld) Shutdown(waitForMysqld bool, mysqlWaitTime time.Duration) log.Infof("Mysqld.Shutdown: sleeping for 1s waiting for socket file %v", mysqld.config.SocketFile) time.Sleep(time.Second) } - return errors.New("gave up waiting for mysqld to stop") } return nil } @@ -382,7 +394,7 @@ func execCmd(name string, args, env []string, dir string) (cmd *exec.Cmd, err er // Init will create the default directory structure for the mysqld process, // generate / configure a my.cnf file, unpack a skeleton database, // and create some management tables. -func (mysqld *Mysqld) Init(mysqlWaitTime time.Duration, bootstrapArchive string) error { +func (mysqld *Mysqld) Init(ctx context.Context, bootstrapArchive string) error { log.Infof("mysqlctl.Init") err := mysqld.createDirs() if err != nil { @@ -411,7 +423,7 @@ func (mysqld *Mysqld) Init(mysqlWaitTime time.Duration, bootstrapArchive string) } // Start mysqld. - if err = mysqld.Start(mysqlWaitTime); err != nil { + if err = mysqld.Start(ctx); err != nil { log.Errorf("failed starting, check %v", mysqld.config.ErrorLogPath) return err } @@ -501,9 +513,9 @@ func (mysqld *Mysqld) createTopDir(dir string) error { } // Teardown will shutdown the running daemon, and delete the root directory. -func (mysqld *Mysqld) Teardown(force bool) error { +func (mysqld *Mysqld) Teardown(ctx context.Context, force bool) error { log.Infof("mysqlctl.Teardown") - if err := mysqld.Shutdown(true, MysqlWaitTime); err != nil { + if err := mysqld.Shutdown(ctx, true); err != nil { log.Warningf("failed mysqld shutdown: %v", err.Error()) if !force { return err