From a3efb4ebd231d40cb568dcf893564be655860100 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Mon, 22 Feb 2016 14:37:09 -0800 Subject: [PATCH 1/5] Support semi-sync without async fallback. When using semi-sync with rpl_semi_sync_master_wait_no_slave=1 and rpl_semi_sync_master_timeout=[effectively infinite], it's necessary to toggle master-side semi-sync at the proper times to prevent slaves from getting stuck (since they have no slaves of their own to ACK). This commit adds a new vttablet flag -enable_semi_sync, which causes the replication-management features of Vitess (e.g. InitShardMaster and various reparent operations) to automatically manage semi-sync settings. With these settings, it becomes (reasonably) safe to reparent away from an unresponsive master, as long as the following can be verified: * All semi-sync slaves have stopped replicating from the old master. * The chosen new master is the farthest ahead in replication. Currently we only allow slaves marked in Vitess terms as "replica" tablets to be semi-sync slaves (i.e. to send ACKs). Vitess slaves marked as "rdonly" will NOT ACK, because they are not eligible to become masters, so we never want them to be the only slave with the latest update. This means when using semi-sync, every shard must have at least 2 live "replica" type tablets (including the master) in order to accept writes. --- config/mycnf/default.cnf | 17 +++++ go/vt/mysqlctl/backup.go | 33 ++++++---- go/vt/mysqlctl/mysql_daemon.go | 12 ++++ go/vt/mysqlctl/query.go | 20 +++++- go/vt/mysqlctl/replication.go | 40 ++++++++++++ go/vt/tabletmanager/restore.go | 7 ++ go/vt/tabletmanager/rpc_replication.go | 89 +++++++++++++++++++++++++- 7 files changed, 204 insertions(+), 14 deletions(-) diff --git a/config/mycnf/default.cnf b/config/mycnf/default.cnf index ce84b7cceb..8697e0a2d2 100644 --- a/config/mycnf/default.cnf +++ b/config/mycnf/default.cnf @@ -63,3 +63,20 @@ transaction-isolation = REPEATABLE-READ # READ-COMMITTED would be better, but mysql 5.1 disables this with statement based replication # READ-UNCOMMITTED might be better lower_case_table_names = 1 + +# Semi-sync replication is required for automated unplanned failover +# (when the master goes away). Here we just load the plugin so it's +# available if desired, but it's disabled at startup. +# +# If the -enable_semi_sync flag is used, VTTablet will enable semi-sync +# at the proper time when replication is set up, or when masters are +# promoted or demoted. +plugin-load = rpl_semi_sync_master=semisync_master.so;rpl_semi_sync_slave=semisync_slave.so + +# When semi-sync is enabled, don't allow fallback to async +# if you get no ack, or have no slaves. This is necessary to +# prevent alternate futures when doing a failover in response to +# a master that becomes unresponsive. +rpl_semi_sync_master_timeout = 1000000000000000000 +rpl_semi_sync_master_wait_no_slave = 1 + diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go index 448db69b99..262dfae644 100644 --- a/go/vt/mysqlctl/backup.go +++ b/go/vt/mysqlctl/backup.go @@ -214,6 +214,7 @@ func backup(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, bh b sourceIsMaster := false readOnly := true var replicationPosition replication.Position + semiSyncMaster, semiSyncSlave := mysqld.SemiSyncEnabled() // see if we need to restart replication after backup logger.Infof("getting current replication status") @@ -225,35 +226,35 @@ func backup(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, bh b // keep going if we're the master, might be a degenerate case sourceIsMaster = true default: - return fmt.Errorf("cannot get slave status: %v", err) + return fmt.Errorf("can't get slave status: %v", err) } // get the read-only flag readOnly, err = mysqld.IsReadOnly() if err != nil { - return fmt.Errorf("cannot get read only status: %v", err) + return fmt.Errorf("can't get read-only status: %v", err) } // get the replication position if sourceIsMaster { if !readOnly { - logger.Infof("turning master read-onyl before backup") + logger.Infof("turning master read-only before backup") if err = mysqld.SetReadOnly(true); err != nil { - return fmt.Errorf("cannot get read only status: %v", err) + return fmt.Errorf("can't set read-only status: %v", err) } } replicationPosition, err = mysqld.MasterPosition() if err != nil { - return fmt.Errorf("cannot get master position: %v", err) + return fmt.Errorf("can't get master position: %v", err) } } else { if err = StopSlave(mysqld, hookExtraEnv); err != nil { - return fmt.Errorf("cannot stop slave: %v", err) + return fmt.Errorf("can't stop slave: %v", err) } var slaveStatus replication.Status slaveStatus, err = mysqld.SlaveStatus() if err != nil { - return fmt.Errorf("cannot get slave status: %v", err) + return fmt.Errorf("can't get slave status: %v", err) } replicationPosition = slaveStatus.Position } @@ -262,28 +263,38 @@ func backup(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, bh b // shutdown mysqld err = mysqld.Shutdown(ctx, true) if err != nil { - return fmt.Errorf("cannot shutdown mysqld: %v", err) + return fmt.Errorf("can't shutdown mysqld: %v", err) } // get the files to backup fes, err := findFilesTobackup(mysqld.Cnf()) if err != nil { - return fmt.Errorf("cannot find files to backup: %v", err) + return fmt.Errorf("can't find files to backup: %v", err) } logger.Infof("found %v files to backup", len(fes)) // backup everything if err := backupFiles(mysqld, logger, bh, fes, replicationPosition, backupConcurrency); err != nil { - return fmt.Errorf("cannot backup files: %v", err) + return fmt.Errorf("can't backup files: %v", err) } // Try to restart mysqld err = mysqld.Start(ctx) if err != nil { - return fmt.Errorf("cannot restart mysqld: %v", err) + return fmt.Errorf("can't restart mysqld: %v", err) } // Restore original mysqld state that we saved above. + if semiSyncMaster || semiSyncSlave { + // Only do this if one of them was on, since both being off could mean + // the plugin isn't even loaded, and the server variables don't exist. + logger.Infof("restoring semi-sync settings from before backup: master=%v, slave=%v", + semiSyncMaster, semiSyncSlave) + err := mysqld.SetSemiSyncEnabled(semiSyncMaster, semiSyncSlave) + if err != nil { + return err + } + } if slaveStartRequired { logger.Infof("restarting mysql replication") if err := StartSlave(mysqld, hookExtraEnv); err != nil { diff --git a/go/vt/mysqlctl/mysql_daemon.go b/go/vt/mysqlctl/mysql_daemon.go index 9c11b9dc05..fb60c1e6d6 100644 --- a/go/vt/mysqlctl/mysql_daemon.go +++ b/go/vt/mysqlctl/mysql_daemon.go @@ -38,6 +38,8 @@ type MysqlDaemon interface { // replication related methods SlaveStatus() (replication.Status, error) + SetSemiSyncEnabled(master, slave bool) error + SemiSyncEnabled() (master, slave bool) // reparenting related methods ResetReplicationCommands() ([]string, error) @@ -438,3 +440,13 @@ func (fmd *FakeMysqlDaemon) GetAppConnection() (dbconnpool.PoolConnection, error func (fmd *FakeMysqlDaemon) GetDbaConnection() (*dbconnpool.DBConnection, error) { return dbconnpool.NewDBConnection(&sqldb.ConnParams{Engine: fmd.db.Name}, stats.NewTimings("")) } + +// SetSemiSyncEnabled is part of the MysqlDaemon interface. +func (fmd *FakeMysqlDaemon) SetSemiSyncEnabled(master, slave bool) error { + return nil +} + +// SemiSyncEnabled is part of the MysqlDaemon interface. +func (fmd *FakeMysqlDaemon) SemiSyncEnabled() (master, slave bool) { + return false, false +} diff --git a/go/vt/mysqlctl/query.go b/go/vt/mysqlctl/query.go index f067078335..d85fd3101d 100644 --- a/go/vt/mysqlctl/query.go +++ b/go/vt/mysqlctl/query.go @@ -62,13 +62,31 @@ func (mysqld *Mysqld) fetchSuperQueryMap(query string) (map[string]string, error return nil, fmt.Errorf("query %#v returned %d column names, expected %d", query, len(qr.Fields), len(qr.Rows[0])) } - rowMap := make(map[string]string) + rowMap := make(map[string]string, len(qr.Rows[0])) for i, value := range qr.Rows[0] { rowMap[qr.Fields[i].Name] = value.String() } return rowMap, nil } +// fetchVariables returns a map from MySQL variable names to variable value +// for variables that match the given pattern. +func (mysqld *Mysqld) fetchVariables(pattern string) (map[string]string, error) { + query := fmt.Sprintf("SHOW VARIABLES LIKE '%s'", pattern) + qr, err := mysqld.FetchSuperQuery(query) + if err != nil { + return nil, err + } + if len(qr.Fields) != 2 { + return nil, fmt.Errorf("query %#v returned %d columns, expected 2", query, len(qr.Fields)) + } + varMap := make(map[string]string, len(qr.Rows)) + for _, row := range qr.Rows { + varMap[row[0].String()] = row[1].String() + } + return varMap, nil +} + const masterPasswordStart = " MASTER_PASSWORD = '" const masterPasswordEnd = "',\n" diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index 45b599fa6d..d22ab6cc9a 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -346,3 +346,43 @@ func (mysqld *Mysqld) DisableBinlogPlayback() error { } return flavor.DisableBinlogPlayback(mysqld) } + +// SetSemiSyncEnabled enables or disables semi-sync replication for +// master and/or slave mode. +func (mysqld *Mysqld) SetSemiSyncEnabled(master, slave bool) error { + log.Infof("Setting semi-sync mode: master=%v, slave=%v", master, slave) + + // Convert bool to int. + var m, s int + if master { + m = 1 + } + if slave { + s = 1 + } + + err := mysqld.ExecuteSuperQuery( + fmt.Sprintf( + "SET GLOBAL rpl_semi_sync_master_enabled = %v, GLOBAL rpl_semi_sync_slave_enabled = %v", + m, s)) + if err != nil { + return fmt.Errorf("can't set semi-sync mode: %v; make sure plugins are loaded in my.cnf", err) + } + return nil +} + +// SemiSyncEnabled returns whether semi-sync is enabled for master or slave. +// If the semi-sync plugin is not loaded, we assume semi-sync is disabled. +func (mysqld *Mysqld) SemiSyncEnabled() (master, slave bool) { + vars, err := mysqld.fetchVariables("rpl_semi_sync_%_enabled") + if err != nil { + return false, false + } + if mval, mok := vars["rpl_semi_sync_master_enabled"]; mok { + master = (mval == "ON") + } + if sval, sok := vars["rpl_semi_sync_slave_enabled"]; sok { + slave = (sval == "ON") + } + return master, slave +} diff --git a/go/vt/tabletmanager/restore.go b/go/vt/tabletmanager/restore.go index 1d583e734a..794e4dd1b0 100644 --- a/go/vt/tabletmanager/restore.go +++ b/go/vt/tabletmanager/restore.go @@ -101,6 +101,13 @@ func (agent *ActionAgent) startReplication(ctx context.Context, pos replication. return fmt.Errorf("Cannot read master tablet %v: %v", si.MasterAlias, err) } + // If using semi-sync, we need to enable it before connecting to master. + if *enableSemiSync { + if err := agent.enableSemiSync(false); err != nil { + return err + } + } + // Set master and start slave. cmds, err = agent.MysqlDaemon.SetMasterCommands(ti.Hostname, int(ti.PortMap["mysql"])) if err != nil { diff --git a/go/vt/tabletmanager/rpc_replication.go b/go/vt/tabletmanager/rpc_replication.go index e827021985..8207ee7973 100644 --- a/go/vt/tabletmanager/rpc_replication.go +++ b/go/vt/tabletmanager/rpc_replication.go @@ -5,6 +5,7 @@ package tabletmanager import ( + "flag" "fmt" "time" @@ -20,6 +21,10 @@ import ( topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" ) +var ( + enableSemiSync = flag.Bool("enable_semi_sync", false, "Enable semi-sync when configuring replication, on master and replica tablets only (rdonly tablets will not ack).") +) + // SlaveStatus returns the replication status // Should be called under RPCWrap. func (agent *ActionAgent) SlaveStatus(ctx context.Context) (*replicationdatapb.Status, error) { @@ -72,6 +77,11 @@ func (agent *ActionAgent) StopSlaveMinimum(ctx context.Context, position string, // replication or not (using hook if not). // Should be called under RPCWrapLock. func (agent *ActionAgent) StartSlave(ctx context.Context) error { + if *enableSemiSync { + if err := agent.enableSemiSync(false); err != nil { + return err + } + } return mysqlctl.StartSlave(agent.MysqlDaemon, agent.hookExtraEnv()) } @@ -107,10 +117,16 @@ func (agent *ActionAgent) InitMaster(ctx context.Context) (string, error) { return "", err } + // If using semi-sync, we need to enable it before going read-write. + if *enableSemiSync { + if err := agent.enableSemiSync(true); err != nil { + return "", err + } + } + // Set the server read-write, from now on we can accept real // client writes. Note that if semi-sync replication is enabled, - // we'll still need some slaves to be able to commit - // transactions. + // we'll still need some slaves to be able to commit transactions. if err := agent.MysqlDaemon.SetReadOnly(false); err != nil { return "", err } @@ -152,6 +168,13 @@ func (agent *ActionAgent) InitSlave(ctx context.Context, parent *topodatapb.Tabl return err } + // If using semi-sync, we need to enable it before connecting to master. + if *enableSemiSync { + if err := agent.enableSemiSync(false); err != nil { + return err + } + } + cmds, err := agent.MysqlDaemon.SetSlavePositionCommands(pos) if err != nil { return err @@ -189,6 +212,13 @@ func (agent *ActionAgent) DemoteMaster(ctx context.Context) (string, error) { return "", fmt.Errorf("disallowQueries failed: %v", err) } + // If using semi-sync, we need to disable master-side. + if *enableSemiSync { + if err := agent.enableSemiSync(false); err != nil { + return "", err + } + } + pos, err := agent.MysqlDaemon.DemoteMaster() if err != nil { return "", err @@ -227,6 +257,13 @@ func (agent *ActionAgent) PromoteSlaveWhenCaughtUp(ctx context.Context, position return "", err } + // If using semi-sync, we need to enable it before going read-write. + if *enableSemiSync { + if err := agent.enableSemiSync(true); err != nil { + return "", err + } + } + if err := agent.MysqlDaemon.SetReadOnly(false); err != nil { return "", err } @@ -265,6 +302,13 @@ func (agent *ActionAgent) SetMaster(ctx context.Context, parentAlias *topodatapb shouldbeReplicating = true } + // If using semi-sync, we need to enable it before connecting to master. + if *enableSemiSync { + if err := agent.enableSemiSync(false); err != nil { + return err + } + } + // Create the list of commands to set the master cmds := []string{} if wasReplicating { @@ -347,6 +391,13 @@ func (agent *ActionAgent) PromoteSlave(ctx context.Context) (string, error) { return "", err } + // If using semi-sync, we need to enable it before going read-write. + if *enableSemiSync { + if err := agent.enableSemiSync(true); err != nil { + return "", err + } + } + // Set the server read-write if err := agent.MysqlDaemon.SetReadOnly(false); err != nil { return "", err @@ -358,3 +409,37 @@ func (agent *ActionAgent) PromoteSlave(ctx context.Context) (string, error) { return replication.EncodePosition(pos), nil } + +func (agent *ActionAgent) isMasterEligible() (bool, error) { + switch agent.Tablet().Type { + case topodatapb.TabletType_MASTER, topodatapb.TabletType_REPLICA: + return true, nil + case topodatapb.TabletType_SPARE: + // If we're SPARE, it could be because healthcheck is enabled. + tt, err := topoproto.ParseTabletType(*targetTabletType) + if err != nil { + return false, fmt.Errorf("can't determine if tablet is master-eligible: currently SPARE and no -target_tablet_type flag specified") + } + if tt == topodatapb.TabletType_REPLICA { + return true, nil + } + } + + return false, nil +} + +func (agent *ActionAgent) enableSemiSync(master bool) error { + // Only enable if we're eligible for becoming master (REPLICA type). + // Ineligible slaves (RDONLY) shouldn't ACK because we'll never promote them. + masterEligible, err := agent.isMasterEligible() + if err != nil { + return fmt.Errorf("can't enable semi-sync: %v", err) + } + if !masterEligible { + return nil + } + + // Always enable slave-side since it doesn't hurt to keep it on for a master. + // The master-side needs to be off for a slave, or else it will get stuck. + return agent.MysqlDaemon.SetSemiSyncEnabled(master, true) +} From 6ed85e04ff586d9ea12c11cf97629a498e025077 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Wed, 24 Feb 2016 18:01:58 -0800 Subject: [PATCH 2/5] mysqlctl: Check dbaPool connections before using them. For SUPER queries (presumed to be administrative in nature), this will test the connection given to us by the dbaPool before using it. If we get "MySQL server has gone away (errno 2006)", try once to reconnect before giving up. This makes SUPER queries more resilient when mysqld goes away and then comes back. --- go/vt/mysqlctl/query.go | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/go/vt/mysqlctl/query.go b/go/vt/mysqlctl/query.go index d85fd3101d..93f1e924ff 100644 --- a/go/vt/mysqlctl/query.go +++ b/go/vt/mysqlctl/query.go @@ -7,11 +7,34 @@ package mysqlctl import ( "fmt" "strings" + "time" log "github.com/golang/glog" + "github.com/youtube/vitess/go/sqldb" "github.com/youtube/vitess/go/sqltypes" + "github.com/youtube/vitess/go/vt/dbconnpool" ) +// getPoolReconnect gets a connection from a pool, tests it, and reconnects if +// it gets errno 2006. +func getPoolReconnect(pool *dbconnpool.ConnectionPool, timeout time.Duration) (dbconnpool.PoolConnection, error) { + conn, err := pool.Get(timeout) + if err != nil { + return conn, err + } + // Run a test query to see if this connection is still good. + if _, err := conn.ExecuteFetch("SELECT 1", 1, false); err != nil { + // If we get "MySQL server has gone away (errno 2006)", try to reconnect. + if sqlErr, ok := err.(*sqldb.SQLError); ok && sqlErr.Number() == 2006 { + if err := conn.Reconnect(); err != nil { + conn.Recycle() + return conn, err + } + } + } + return conn, nil +} + // ExecuteSuperQuery allows the user to execute a query as a super user. func (mysqld *Mysqld) ExecuteSuperQuery(query string) error { return mysqld.ExecuteSuperQueryList([]string{query}) @@ -19,7 +42,7 @@ func (mysqld *Mysqld) ExecuteSuperQuery(query string) error { // ExecuteSuperQueryList alows the user to execute queries as a super user. func (mysqld *Mysqld) ExecuteSuperQueryList(queryList []string) error { - conn, connErr := mysqld.dbaPool.Get(0) + conn, connErr := getPoolReconnect(mysqld.dbaPool, 0) if connErr != nil { return connErr } @@ -35,7 +58,7 @@ func (mysqld *Mysqld) ExecuteSuperQueryList(queryList []string) error { // FetchSuperQuery returns the results of executing a query as a super user. func (mysqld *Mysqld) FetchSuperQuery(query string) (*sqltypes.Result, error) { - conn, connErr := mysqld.dbaPool.Get(0) + conn, connErr := getPoolReconnect(mysqld.dbaPool, 0) if connErr != nil { return nil, connErr } From 44d10d40c46563cdaf23a97843affee5b8ad804c Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Tue, 23 Feb 2016 19:32:35 -0800 Subject: [PATCH 3/5] Use semi-sync in integration tests. Some tests had to be modified to not assume that masters without replicas will still accept writes. Also, some tests would create tablets as SPARE without enabling healthcheck. As a result, the tablet has no way of knowing whether it will eventually be a replica or rdonly, and hence can't decide whether to enable semi-sync before starting replication. Healthcheck is basically required now, and definitely will be once we fully switch to vtgate discovery mode. So I've removed cases where tablets were being started SPARE without enabling healthcheck. --- test/backup.py | 1 + test/custom_sharding.py | 46 ++++++++++++++---------------- test/keyspace_util.py | 11 ++++++-- test/mysql_flavor.py | 15 ++++++++++ test/mysqlctl.py | 1 + test/reparent.py | 12 ++------ test/resharding.py | 10 +++---- test/rowcache_invalidator.py | 33 ++++++++++++---------- test/schema.py | 2 ++ test/tablet.py | 12 +++++++- test/tabletmanager.py | 1 + test/vtctld_test.py | 16 ++--------- test/vtgatev2_test.py | 2 -- test/worker.py | 55 ++++++++++++++---------------------- 14 files changed, 110 insertions(+), 107 deletions(-) diff --git a/test/backup.py b/test/backup.py index 5e95b84673..aab37facdd 100755 --- a/test/backup.py +++ b/test/backup.py @@ -74,6 +74,7 @@ class TestBackup(unittest.TestCase): environment.topo_server().wipe() for t in [tablet_master, tablet_replica1, tablet_replica2]: t.reset_replication() + t.set_semi_sync_enabled(master=False) t.clean_dbs() _create_vt_insert_test = '''create table vt_insert_test ( diff --git a/test/custom_sharding.py b/test/custom_sharding.py index a70d0edfac..ceb34bcecb 100755 --- a/test/custom_sharding.py +++ b/test/custom_sharding.py @@ -15,24 +15,24 @@ import environment import tablet import utils -# shards +# shards need at least 1 replica for semi-sync ACK, and 1 rdonly for SplitQuery. shard_0_master = tablet.Tablet() +shard_0_replica = tablet.Tablet() shard_0_rdonly = tablet.Tablet() shard_1_master = tablet.Tablet() +shard_1_replica = tablet.Tablet() shard_1_rdonly = tablet.Tablet() +all_tablets = [shard_0_master, shard_0_replica, shard_0_rdonly, + shard_1_master, shard_1_replica, shard_1_rdonly] + def setUpModule(): try: environment.topo_server().setup() - setup_procs = [ - shard_0_master.init_mysql(), - shard_0_rdonly.init_mysql(), - shard_1_master.init_mysql(), - shard_1_rdonly.init_mysql(), - ] + setup_procs = [t.init_mysql() for t in all_tablets] utils.Vtctld().start() utils.VtGate().start() utils.wait_procs(setup_procs) @@ -46,22 +46,15 @@ def tearDownModule(): if utils.options.skip_teardown: return - teardown_procs = [ - shard_0_master.teardown_mysql(), - shard_0_rdonly.teardown_mysql(), - shard_1_master.teardown_mysql(), - shard_1_rdonly.teardown_mysql(), - ] + teardown_procs = [t.teardown_mysql() for t in all_tablets] utils.wait_procs(teardown_procs, raise_on_error=False) environment.topo_server().teardown() utils.kill_sub_processes() utils.remove_tmp_files() - shard_0_master.remove_tree() - shard_0_rdonly.remove_tree() - shard_1_master.remove_tree() - shard_1_rdonly.remove_tree() + for t in all_tablets: + t.remove_tree() class TestCustomSharding(unittest.TestCase): @@ -118,11 +111,12 @@ class TestCustomSharding(unittest.TestCase): # start the first shard only for now shard_0_master.init_tablet('master', 'test_keyspace', '0') + shard_0_replica.init_tablet('replica', 'test_keyspace', '0') shard_0_rdonly.init_tablet('rdonly', 'test_keyspace', '0') - for t in [shard_0_master, shard_0_rdonly]: + for t in [shard_0_master, shard_0_replica, shard_0_rdonly]: t.create_db('vt_test_keyspace') t.start_vttablet(wait_for_state=None) - for t in [shard_0_master, shard_0_rdonly]: + for t in [shard_0_master, shard_0_replica, shard_0_rdonly]: t.wait_for_vttablet_state('SERVING') utils.run_vtctl(['InitShardMaster', 'test_keyspace/0', @@ -143,7 +137,7 @@ primary key (id) auto_log=True) # reload schema everywhere so the QueryService knows about the tables - for t in [shard_0_master, shard_0_rdonly]: + for t in [shard_0_master, shard_0_replica, shard_0_rdonly]: utils.run_vtctl(['ReloadSchema', t.tablet_alias], auto_log=True) # insert data on shard 0 @@ -154,10 +148,11 @@ primary key (id) # create shard 1 shard_1_master.init_tablet('master', 'test_keyspace', '1') + shard_1_replica.init_tablet('replica', 'test_keyspace', '1') shard_1_rdonly.init_tablet('rdonly', 'test_keyspace', '1') - for t in [shard_1_master, shard_1_rdonly]: + for t in [shard_1_master, shard_1_replica, shard_1_rdonly]: t.start_vttablet(wait_for_state=None) - for t in [shard_1_master, shard_1_rdonly]: + for t in [shard_1_master, shard_1_replica, shard_1_rdonly]: t.wait_for_vttablet_state('NOT_SERVING') s = utils.run_vtctl_json(['GetShard', 'test_keyspace/1']) self.assertEqual(len(s['served_types']), 3) @@ -166,7 +161,7 @@ primary key (id) shard_1_master.tablet_alias], auto_log=True) utils.run_vtctl(['CopySchemaShard', shard_0_rdonly.tablet_alias, 'test_keyspace/1'], auto_log=True) - for t in [shard_1_master, shard_1_rdonly]: + for t in [shard_1_master, shard_1_replica, shard_1_rdonly]: utils.run_vtctl(['RefreshState', t.tablet_alias], auto_log=True) t.wait_for_vttablet_state('SERVING') @@ -189,7 +184,7 @@ primary key (id) auto_log=True) # reload schema everywhere so the QueryService knows about the tables - for t in [shard_0_master, shard_0_rdonly, shard_1_master, shard_1_rdonly]: + for t in all_tablets: utils.run_vtctl(['ReloadSchema', t.tablet_alias], auto_log=True) # insert and read data on all shards @@ -240,7 +235,8 @@ primary key (id) def _check_shards_count_in_srv_keyspace(self, shard_count): ks = utils.run_vtctl_json(['GetSrvKeyspace', 'test_nj', 'test_keyspace']) - check_types = set([topodata_pb2.MASTER, topodata_pb2.RDONLY]) + check_types = set([topodata_pb2.MASTER, topodata_pb2.REPLICA, + topodata_pb2.RDONLY]) for p in ks['partitions']: if p['served_type'] in check_types: self.assertEqual(len(p['shard_references']), shard_count) diff --git a/test/keyspace_util.py b/test/keyspace_util.py index 691621020d..345a6244ea 100644 --- a/test/keyspace_util.py +++ b/test/keyspace_util.py @@ -21,7 +21,12 @@ class TestEnv(object): self.tablet_map = {} def launch( - self, keyspace, shards=None, replica_count=0, rdonly_count=0, ddls=None): + self, keyspace, shards=None, replica_count=1, rdonly_count=0, ddls=None): + """Launch test environment.""" + + if replica_count < 1: + raise Exception('replica_count=%d < 1; tests now use semi-sync' + ' and must have at least one replica' % replica_count) self.tablets = [] utils.run_vtctl(['CreateKeyspace', keyspace]) if not shards or shards[0] == '0': @@ -52,8 +57,6 @@ class TestEnv(object): if t.tablet_type == 'master': utils.run_vtctl(['InitShardMaster', keyspace+'/'+t.shard, t.tablet_alias], auto_log=True) - # Force read-write even if there are no replicas. - utils.run_vtctl(['SetReadWrite', t.tablet_alias], auto_log=True) for ddl in ddls: fname = os.path.join(environment.tmproot, 'ddl.sql') @@ -70,6 +73,8 @@ class TestEnv(object): t.remove_tree() def _start_tablet(self, keyspace, shard, tablet_type, index): + """Start a tablet.""" + t = tablet.Tablet() self.tablets.append(t) if tablet_type == 'master': diff --git a/test/mysql_flavor.py b/test/mysql_flavor.py index 0ecf1907ae..a6856057ef 100644 --- a/test/mysql_flavor.py +++ b/test/mysql_flavor.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +"""Define abstractions for various MySQL flavors.""" import environment import logging @@ -28,6 +29,15 @@ class MysqlFlavor(object): def change_master_commands(self, host, port, pos): raise NotImplementedError() + def set_semi_sync_enabled_commands(self, master=None, slave=None): + """Returns commands to turn semi-sync on/off.""" + cmds = [] + if master is not None: + cmds.append("SET GLOBAL rpl_semi_sync_master_enabled = %d" % master) + if slave is not None: + cmds.append("SET GLOBAL rpl_semi_sync_slave_enabled = %d" % master) + return cmds + def extra_my_cnf(self): """Returns the path to an extra my_cnf file, or None.""" return None @@ -157,6 +167,11 @@ def mysql_flavor(): def set_mysql_flavor(flavor): + """Set the object that will be returned by mysql_flavor(). + + If flavor is not specified, set it based on MYSQL_FLAVOR environment variable. + """ + global __mysql_flavor if not flavor: diff --git a/test/mysqlctl.py b/test/mysqlctl.py index ad4e30ff1c..c687b3acc0 100755 --- a/test/mysqlctl.py +++ b/test/mysqlctl.py @@ -64,6 +64,7 @@ class TestMysqlctl(unittest.TestCase): tablet.Tablet.check_vttablet_count() for t in [master_tablet, replica_tablet]: t.reset_replication() + t.set_semi_sync_enabled(master=False) t.clean_dbs() def test_mysqlctl_restart(self): diff --git a/test/reparent.py b/test/reparent.py index 67dba4de60..5e8ff5e284 100755 --- a/test/reparent.py +++ b/test/reparent.py @@ -67,6 +67,7 @@ class TestReparent(unittest.TestCase): environment.topo_server().wipe() for t in [tablet_62344, tablet_62044, tablet_41983, tablet_31981]: t.reset_replication() + t.set_semi_sync_enabled(master=False) t.clean_dbs() super(TestReparent, self).tearDown() @@ -320,13 +321,7 @@ class TestReparent(unittest.TestCase): self._check_master_cell('test_nj', shard_id, 'test_nj') self._check_master_cell('test_ny', shard_id, 'test_nj') - # Convert two replica to spare. That should leave only one node - # serving traffic, but still needs to appear in the replication - # graph. - utils.run_vtctl(['ChangeSlaveType', tablet_41983.tablet_alias, 'spare']) - utils.run_vtctl(['ChangeSlaveType', tablet_31981.tablet_alias, 'spare']) utils.validate_topology() - self._check_db_addr(shard_id, 'replica', tablet_62044.port) # Run this to make sure it succeeds. utils.run_vtctl(['ShardReplicationPositions', 'test_keyspace/' + shard_id], @@ -569,13 +564,12 @@ class TestReparent(unittest.TestCase): wait_for_start=False) tablet_31981.init_tablet('replica', 'test_keyspace', shard_id, start=True, wait_for_start=False) - tablet_41983.init_tablet('spare', 'test_keyspace', shard_id, start=True, + tablet_41983.init_tablet('replica', 'test_keyspace', shard_id, start=True, wait_for_start=False) # wait for all tablets to start - for t in [tablet_62344, tablet_62044, tablet_31981]: + for t in [tablet_62344, tablet_62044, tablet_31981, tablet_41983]: t.wait_for_vttablet_state('SERVING') - tablet_41983.wait_for_vttablet_state('NOT_SERVING') # Recompute the shard layout node - until you do that, it might not be # valid. diff --git a/test/resharding.py b/test/resharding.py index 4dfcb72801..5870b4be6d 100755 --- a/test/resharding.py +++ b/test/resharding.py @@ -472,7 +472,7 @@ primary key (name) shard_0_ny_rdonly.init_tablet('rdonly', 'test_keyspace', '-80') shard_1_master.init_tablet('master', 'test_keyspace', '80-') shard_1_slave1.init_tablet('replica', 'test_keyspace', '80-') - shard_1_slave2.init_tablet('spare', 'test_keyspace', '80-') + shard_1_slave2.init_tablet('replica', 'test_keyspace', '80-') shard_1_ny_rdonly.init_tablet('rdonly', 'test_keyspace', '80-') shard_1_rdonly1.init_tablet('rdonly', 'test_keyspace', '80-') @@ -497,7 +497,7 @@ primary key (name) shard_0_ny_rdonly.wait_for_vttablet_state('SERVING') shard_1_master.wait_for_vttablet_state('SERVING') shard_1_slave1.wait_for_vttablet_state('SERVING') - shard_1_slave2.wait_for_vttablet_state('NOT_SERVING') # spare + shard_1_slave2.wait_for_vttablet_state('SERVING') shard_1_ny_rdonly.wait_for_vttablet_state('SERVING') shard_1_rdonly1.wait_for_vttablet_state('SERVING') @@ -521,10 +521,10 @@ primary key (name) # create the split shards shard_2_master.init_tablet('master', 'test_keyspace', '80-c0') - shard_2_replica1.init_tablet('spare', 'test_keyspace', '80-c0') - shard_2_replica2.init_tablet('spare', 'test_keyspace', '80-c0') + shard_2_replica1.init_tablet('replica', 'test_keyspace', '80-c0') + shard_2_replica2.init_tablet('replica', 'test_keyspace', '80-c0') shard_3_master.init_tablet('master', 'test_keyspace', 'c0-') - shard_3_replica.init_tablet('spare', 'test_keyspace', 'c0-') + shard_3_replica.init_tablet('replica', 'test_keyspace', 'c0-') shard_3_rdonly1.init_tablet('rdonly', 'test_keyspace', 'c0-') # start vttablet on the split shards (no db created, diff --git a/test/rowcache_invalidator.py b/test/rowcache_invalidator.py index d9e31e154c..0c91288ef4 100755 --- a/test/rowcache_invalidator.py +++ b/test/rowcache_invalidator.py @@ -19,6 +19,11 @@ warnings.simplefilter('ignore') master_tablet = tablet.Tablet() replica_tablet = tablet.Tablet() +# Second replica to provide semi-sync ACKs while testing +# scenarios when the first replica is down. +replica2_tablet = tablet.Tablet() + +all_tablets = [master_tablet, replica_tablet, replica2_tablet] create_vt_insert_test = '''create table vt_insert_test ( id bigint auto_increment, @@ -32,9 +37,7 @@ def setUpModule(): environment.topo_server().setup() # start mysql instance external to the test - setup_procs = [master_tablet.init_mysql(), - replica_tablet.init_mysql()] - utils.wait_procs(setup_procs) + utils.wait_procs([t.init_mysql() for t in all_tablets]) # start a vtctld so the vtctl insert commands are just RPCs, not forks utils.Vtctld().start() @@ -44,15 +47,16 @@ def setUpModule(): utils.run_vtctl(['CreateKeyspace', 'test_keyspace']) master_tablet.init_tablet('master', 'test_keyspace', '0') replica_tablet.init_tablet('replica', 'test_keyspace', '0') + replica2_tablet.init_tablet('replica', 'test_keyspace', '0') utils.validate_topology() - master_tablet.populate('vt_test_keyspace', create_vt_insert_test) - replica_tablet.populate('vt_test_keyspace', create_vt_insert_test) + for t in all_tablets: + t.populate('vt_test_keyspace', create_vt_insert_test) - master_tablet.start_vttablet(memcache=True, wait_for_state=None) - replica_tablet.start_vttablet(memcache=True, wait_for_state=None) - master_tablet.wait_for_vttablet_state('SERVING') - replica_tablet.wait_for_vttablet_state('SERVING') + for t in all_tablets: + t.start_vttablet(memcache=True, wait_for_state=None) + for t in all_tablets: + t.wait_for_vttablet_state('SERVING') utils.run_vtctl(['InitShardMaster', 'test_keyspace/0', master_tablet.tablet_alias], auto_log=True) @@ -71,16 +75,15 @@ def tearDownModule(): if utils.options.skip_teardown: return logging.debug('Tearing down the servers and setup') - tablet.kill_tablets([master_tablet, replica_tablet]) - teardown_procs = [master_tablet.teardown_mysql(), - replica_tablet.teardown_mysql()] - utils.wait_procs(teardown_procs, raise_on_error=False) + tablet.kill_tablets(all_tablets) + utils.wait_procs([t.teardown_mysql() for t in all_tablets], + raise_on_error=False) environment.topo_server().teardown() utils.kill_sub_processes() utils.remove_tmp_files() - master_tablet.remove_tree() - replica_tablet.remove_tree() + for t in all_tablets: + t.remove_tree() class MultiDict(dict): diff --git a/test/schema.py b/test/schema.py index fa59c392fd..86fe067bf4 100755 --- a/test/schema.py +++ b/test/schema.py @@ -118,6 +118,8 @@ def _teardown_shard_2(): ['DeleteShard', '-recursive', 'test_keyspace/2'], auto_log=True) for t in shard_2_tablets: + t.reset_replication() + t.set_semi_sync_enabled(master=False) t.clean_dbs() diff --git a/test/tablet.py b/test/tablet.py index 9c6a1c2081..077285524f 100644 --- a/test/tablet.py +++ b/test/tablet.py @@ -266,6 +266,12 @@ class Tablet(object): def reset_replication(self): self.mquery('', mysql_flavor().reset_replication_commands()) + def set_semi_sync_enabled(self, master=None, slave=None): + logging.debug('mysql(%s): setting semi-sync mode: master=%s, slave=%s', + self.tablet_uid, master, slave) + self.mquery('', + mysql_flavor().set_semi_sync_enabled_commands(master, slave)) + def populate(self, dbname, create_sql, insert_sqls=None): self.create_db(dbname) if isinstance(create_sql, basestring): @@ -342,6 +348,8 @@ class Tablet(object): tablet_index=None, start=False, dbname=None, parent=True, wait_for_start=True, include_mysql_port=True, **kwargs): + """Initialize a tablet's record in topology.""" + self.tablet_type = tablet_type self.keyspace = keyspace self.shard = shard @@ -399,7 +407,7 @@ class Tablet(object): extra_args=None, extra_env=None, include_mysql_port=True, init_tablet_type=None, init_keyspace=None, init_shard=None, init_db_name_override=None, - supports_backups=False, grace_period='1s'): + supports_backups=False, grace_period='1s', enable_semi_sync=True): """Starts a vttablet process, and returns it. The process is also saved in self.proc, so it's easy to kill as well. @@ -422,6 +430,8 @@ class Tablet(object): args.extend(['-binlog_player_healthcheck_retry_delay', '1s']) args.extend(['-binlog_player_retry_delay', '1s']) args.extend(['-pid_file', os.path.join(self.tablet_dir, 'vttablet.pid')]) + if enable_semi_sync: + args.append('-enable_semi_sync') if self.use_mysqlctld: args.extend( ['-mysqlctl_socket', os.path.join(self.tablet_dir, 'mysqlctl.sock')]) diff --git a/test/tabletmanager.py b/test/tabletmanager.py index 24858f8f9c..9d1666b8b0 100755 --- a/test/tabletmanager.py +++ b/test/tabletmanager.py @@ -66,6 +66,7 @@ class TestTabletManager(unittest.TestCase): environment.topo_server().wipe() for t in [tablet_62344, tablet_62044]: t.reset_replication() + t.set_semi_sync_enabled(master=False) t.clean_dbs() def _check_srv_shard(self): diff --git a/test/vtctld_test.py b/test/vtctld_test.py index bb821efab3..55e824f261 100755 --- a/test/vtctld_test.py +++ b/test/vtctld_test.py @@ -15,13 +15,11 @@ import utils # range '' - 80 shard_0_master = tablet.Tablet() shard_0_replica = tablet.Tablet() -shard_0_spare = tablet.Tablet() # range 80 - '' shard_1_master = tablet.Tablet() shard_1_replica = tablet.Tablet() # all tablets -tablets = [shard_0_master, shard_0_replica, shard_1_master, shard_1_replica, - shard_0_spare] +tablets = [shard_0_master, shard_0_replica, shard_1_master, shard_1_replica] def setUpModule(): @@ -67,8 +65,7 @@ class TestVtctld(unittest.TestCase): 'redirected_keyspace']) shard_0_master.init_tablet('master', 'test_keyspace', '-80') - shard_0_replica.init_tablet('spare', 'test_keyspace', '-80') - shard_0_spare.init_tablet('spare', 'test_keyspace', '-80') + shard_0_replica.init_tablet('replica', 'test_keyspace', '-80') shard_1_master.init_tablet('master', 'test_keyspace', '80-') shard_1_replica.init_tablet('replica', 'test_keyspace', '80-') @@ -86,18 +83,11 @@ class TestVtctld(unittest.TestCase): target_tablet_type='replica', wait_for_state=None) - shard_0_spare.start_vttablet(wait_for_state=None, - extra_args=utils.vtctld.process_args()) - # wait for the right states for t in [shard_0_master, shard_1_master, shard_1_replica]: t.wait_for_vttablet_state('SERVING') - for t in [shard_0_replica, shard_0_spare]: - t.wait_for_vttablet_state('NOT_SERVING') + shard_0_replica.wait_for_vttablet_state('NOT_SERVING') - for t in [shard_0_master, shard_0_replica, shard_0_spare, - shard_1_master, shard_1_replica]: - t.reset_replication() utils.run_vtctl(['InitShardMaster', 'test_keyspace/-80', shard_0_master.tablet_alias], auto_log=True) utils.run_vtctl(['InitShardMaster', 'test_keyspace/80-', diff --git a/test/vtgatev2_test.py b/test/vtgatev2_test.py index 28240fd795..e04d293d1c 100755 --- a/test/vtgatev2_test.py +++ b/test/vtgatev2_test.py @@ -829,8 +829,6 @@ class TestFailures(BaseTestCase): self.master_tablet = shard_1_master self.master_tablet.kill_vttablet() self.tablet_start(self.master_tablet, 'replica') - utils.run_vtctl(['InitShardMaster', KEYSPACE_NAME+'/-80', - shard_0_master.tablet_alias], auto_log=True) self.master_tablet.wait_for_vttablet_state('SERVING') self.replica_tablet = shard_1_replica1 self.replica_tablet.kill_vttablet() diff --git a/test/worker.py b/test/worker.py index de21198d7c..48491b11e3 100755 --- a/test/worker.py +++ b/test/worker.py @@ -384,6 +384,7 @@ class TestBaseSplitClone(unittest.TestCase): for shard_tablet in [all_shard_tablets, shard_0_tablets, shard_1_tablets]: for t in shard_tablet.all_tablets: t.reset_replication() + t.set_semi_sync_enabled(master=False) t.clean_dbs() t.kill_vttablet() # we allow failures here as some tablets will be gone sometimes @@ -420,12 +421,18 @@ class TestBaseSplitCloneResiliency(TestBaseSplitClone): 6. Verify that the data was copied successfully to both new shards Args: - mysql_down: boolean, True iff we expect the MySQL instances on the - destination masters to be down. + mysql_down: boolean. If True, we take down the MySQL instances on the + destination masters at first, then bring them back and reparent away. Raises: AssertionError if things didn't go as expected. """ + if mysql_down: + logging.debug('Shutting down mysqld on destination masters.') + utils.wait_procs( + [shard_0_master.shutdown_mysql(), + shard_1_master.shutdown_mysql()]) + worker_proc, worker_port, worker_rpc_port = utils.run_vtworker_bg( ['--cell', 'test_nj'], auto_log=True) @@ -450,12 +457,21 @@ class TestBaseSplitCloneResiliency(TestBaseSplitClone): "expected vtworker to retry, but it didn't") logging.debug('Worker has resolved at least twice, starting reparent now') - # Original masters have no running MySQL, so need to force the reparent. + # Bring back masters. Since we test with semi-sync now, we need at least + # one replica for the new master. This test is already quite expensive, + # so we bring back the old master as a replica rather than having a third + # replica up the whole time. + logging.debug('Restarting mysqld on destination masters') + utils.wait_procs( + [shard_0_master.start_mysql(), + shard_1_master.start_mysql()]) + + # Reparent away from the old masters. utils.run_vtctl( - ['EmergencyReparentShard', 'test_keyspace/-80', + ['PlannedReparentShard', 'test_keyspace/-80', shard_0_replica.tablet_alias], auto_log=True) utils.run_vtctl( - ['EmergencyReparentShard', 'test_keyspace/80-', + ['PlannedReparentShard', 'test_keyspace/80-', shard_1_replica.tablet_alias], auto_log=True) else: @@ -523,35 +539,6 @@ class TestReparentDuringWorkerCopy(TestBaseSplitCloneResiliency): class TestMysqlDownDuringWorkerCopy(TestBaseSplitCloneResiliency): - def setUp(self): - """Shuts down MySQL on the destination masters. - - Also runs base setup. - """ - try: - logging.debug('Starting base setup for MysqlDownDuringWorkerCopy') - super(TestMysqlDownDuringWorkerCopy, self).setUp() - - logging.debug('Starting MysqlDownDuringWorkerCopy-specific setup') - utils.wait_procs( - [shard_0_master.shutdown_mysql(), - shard_1_master.shutdown_mysql()]) - logging.debug('Finished MysqlDownDuringWorkerCopy-specific setup') - except: - self.tearDown() - raise - - def tearDown(self): - """Restarts the MySQL processes that were killed during the setup.""" - logging.debug('Starting MysqlDownDuringWorkerCopy-specific tearDown') - utils.wait_procs( - [shard_0_master.start_mysql(), - shard_1_master.start_mysql()]) - logging.debug('Finished MysqlDownDuringWorkerCopy-specific tearDown') - - super(TestMysqlDownDuringWorkerCopy, self).tearDown() - logging.debug('Finished base tearDown for MysqlDownDuringWorkerCopy') - def test_mysql_down_during_worker_copy(self): """This test simulates MySQL being down on the destination masters.""" self.verify_successful_worker_copy_with_reparent(mysql_down=True) From 3892568ff7f70d47c7394c868c2bf6c36a178275 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Wed, 24 Feb 2016 18:10:10 -0800 Subject: [PATCH 4/5] Use semi-sync in examples. --- examples/kubernetes/vttablet-pod-template.yaml | 1 + examples/local/vttablet-up.sh | 1 + 2 files changed, 2 insertions(+) diff --git a/examples/kubernetes/vttablet-pod-template.yaml b/examples/kubernetes/vttablet-pod-template.yaml index 041341459e..535c4d1acb 100644 --- a/examples/kubernetes/vttablet-pod-template.yaml +++ b/examples/kubernetes/vttablet-pod-template.yaml @@ -76,6 +76,7 @@ spec: -db-config-filtered-dbname vt_{{keyspace}} -db-config-filtered-charset utf8 -enable-rowcache + -enable_semi_sync -rowcache-bin /usr/bin/memcached -rowcache-socket $VTDATAROOT/{{tablet_subdir}}/memcache.sock -restore_from_backup {{backup_flags}}" vitess diff --git a/examples/local/vttablet-up.sh b/examples/local/vttablet-up.sh index 3bdb560ba6..6c82c81e87 100755 --- a/examples/local/vttablet-up.sh +++ b/examples/local/vttablet-up.sh @@ -94,6 +94,7 @@ for uid_index in $uids; do -target_tablet_type $tablet_type \ -health_check_interval 5s \ -enable-rowcache \ + -enable_semi_sync \ -rowcache-bin $memcached_path \ -rowcache-socket $VTDATAROOT/$tablet_dir/memcache.sock \ -backup_storage_implementation file \ From 43b007dbb826319c812c9b81e8d93c786bd78933 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Thu, 25 Feb 2016 11:59:38 -0800 Subject: [PATCH 5/5] Test semi-sync toggling in wrangler/testlib. --- go/vt/mysqlctl/mysql_daemon.go | 9 +++++- go/vt/tabletmanager/restore.go | 2 +- go/vt/tabletmanager/rpc_replication.go | 17 +++++------ .../testlib/emergency_reparent_shard_test.go | 2 ++ .../testlib/init_shard_master_test.go | 2 ++ .../testlib/planned_reparent_shard_test.go | 3 ++ go/vt/wrangler/testlib/reparent_utils_test.go | 1 + go/vt/wrangler/testlib/semi_sync_test.go | 28 +++++++++++++++++++ go/vt/wrangler/testlib/vtctl_pipe.go | 6 ++-- 9 files changed, 58 insertions(+), 12 deletions(-) create mode 100644 go/vt/wrangler/testlib/semi_sync_test.go diff --git a/go/vt/mysqlctl/mysql_daemon.go b/go/vt/mysqlctl/mysql_daemon.go index fb60c1e6d6..38d5c82a11 100644 --- a/go/vt/mysqlctl/mysql_daemon.go +++ b/go/vt/mysqlctl/mysql_daemon.go @@ -194,6 +194,11 @@ type FakeMysqlDaemon struct { // BinlogPlayerEnabled is used by {Enable,Disable}BinlogPlayer BinlogPlayerEnabled bool + + // SemiSyncMasterEnabled represents the state of rpl_semi_sync_master_enabled. + SemiSyncMasterEnabled bool + // SemiSyncSlaveEnabled represents the state of rpl_semi_sync_slave_enabled. + SemiSyncSlaveEnabled bool } // NewFakeMysqlDaemon returns a FakeMysqlDaemon where mysqld appears @@ -443,10 +448,12 @@ func (fmd *FakeMysqlDaemon) GetDbaConnection() (*dbconnpool.DBConnection, error) // SetSemiSyncEnabled is part of the MysqlDaemon interface. func (fmd *FakeMysqlDaemon) SetSemiSyncEnabled(master, slave bool) error { + fmd.SemiSyncMasterEnabled = master + fmd.SemiSyncSlaveEnabled = slave return nil } // SemiSyncEnabled is part of the MysqlDaemon interface. func (fmd *FakeMysqlDaemon) SemiSyncEnabled() (master, slave bool) { - return false, false + return fmd.SemiSyncMasterEnabled, fmd.SemiSyncSlaveEnabled } diff --git a/go/vt/tabletmanager/restore.go b/go/vt/tabletmanager/restore.go index 794e4dd1b0..a8a8ac309b 100644 --- a/go/vt/tabletmanager/restore.go +++ b/go/vt/tabletmanager/restore.go @@ -102,7 +102,7 @@ func (agent *ActionAgent) startReplication(ctx context.Context, pos replication. } // If using semi-sync, we need to enable it before connecting to master. - if *enableSemiSync { + if *EnableSemiSync { if err := agent.enableSemiSync(false); err != nil { return err } diff --git a/go/vt/tabletmanager/rpc_replication.go b/go/vt/tabletmanager/rpc_replication.go index 8207ee7973..7d1202b7f7 100644 --- a/go/vt/tabletmanager/rpc_replication.go +++ b/go/vt/tabletmanager/rpc_replication.go @@ -22,7 +22,8 @@ import ( ) var ( - enableSemiSync = flag.Bool("enable_semi_sync", false, "Enable semi-sync when configuring replication, on master and replica tablets only (rdonly tablets will not ack).") + // EnableSemiSync is exported for use by vt/wrangler/testlib. + EnableSemiSync = flag.Bool("enable_semi_sync", false, "Enable semi-sync when configuring replication, on master and replica tablets only (rdonly tablets will not ack).") ) // SlaveStatus returns the replication status @@ -77,7 +78,7 @@ func (agent *ActionAgent) StopSlaveMinimum(ctx context.Context, position string, // replication or not (using hook if not). // Should be called under RPCWrapLock. func (agent *ActionAgent) StartSlave(ctx context.Context) error { - if *enableSemiSync { + if *EnableSemiSync { if err := agent.enableSemiSync(false); err != nil { return err } @@ -118,7 +119,7 @@ func (agent *ActionAgent) InitMaster(ctx context.Context) (string, error) { } // If using semi-sync, we need to enable it before going read-write. - if *enableSemiSync { + if *EnableSemiSync { if err := agent.enableSemiSync(true); err != nil { return "", err } @@ -169,7 +170,7 @@ func (agent *ActionAgent) InitSlave(ctx context.Context, parent *topodatapb.Tabl } // If using semi-sync, we need to enable it before connecting to master. - if *enableSemiSync { + if *EnableSemiSync { if err := agent.enableSemiSync(false); err != nil { return err } @@ -213,7 +214,7 @@ func (agent *ActionAgent) DemoteMaster(ctx context.Context) (string, error) { } // If using semi-sync, we need to disable master-side. - if *enableSemiSync { + if *EnableSemiSync { if err := agent.enableSemiSync(false); err != nil { return "", err } @@ -258,7 +259,7 @@ func (agent *ActionAgent) PromoteSlaveWhenCaughtUp(ctx context.Context, position } // If using semi-sync, we need to enable it before going read-write. - if *enableSemiSync { + if *EnableSemiSync { if err := agent.enableSemiSync(true); err != nil { return "", err } @@ -303,7 +304,7 @@ func (agent *ActionAgent) SetMaster(ctx context.Context, parentAlias *topodatapb } // If using semi-sync, we need to enable it before connecting to master. - if *enableSemiSync { + if *EnableSemiSync { if err := agent.enableSemiSync(false); err != nil { return err } @@ -392,7 +393,7 @@ func (agent *ActionAgent) PromoteSlave(ctx context.Context) (string, error) { } // If using semi-sync, we need to enable it before going read-write. - if *enableSemiSync { + if *EnableSemiSync { if err := agent.enableSemiSync(true); err != nil { return "", err } diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index 8142eb733e..73218bd77c 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -138,6 +138,8 @@ func TestEmergencyReparentShard(t *testing.T) { if goodSlave2.FakeMysqlDaemon.Replicating { t.Errorf("goodSlave2.FakeMysqlDaemon.Replicating set") } + checkSemiSyncEnabled(t, true, true, newMaster) + checkSemiSyncEnabled(t, false, true, goodSlave1, goodSlave2) } // TestEmergencyReparentShardMasterElectNotBest tries to emergency reparent diff --git a/go/vt/wrangler/testlib/init_shard_master_test.go b/go/vt/wrangler/testlib/init_shard_master_test.go index 6868a11039..41926aebed 100644 --- a/go/vt/wrangler/testlib/init_shard_master_test.go +++ b/go/vt/wrangler/testlib/init_shard_master_test.go @@ -122,6 +122,8 @@ func TestInitMasterShard(t *testing.T) { if err := goodSlave2.FakeMysqlDaemon.CheckSuperQueryList(); err != nil { t.Fatalf("goodSlave2.FakeMysqlDaemon.CheckSuperQueryList failed: %v", err) } + checkSemiSyncEnabled(t, true, true, master) + checkSemiSyncEnabled(t, false, true, goodSlave1, goodSlave2) } // TestInitMasterShardChecks makes sure the safety checks work diff --git a/go/vt/wrangler/testlib/planned_reparent_shard_test.go b/go/vt/wrangler/testlib/planned_reparent_shard_test.go index 4a5964ee7b..a722f67faf 100644 --- a/go/vt/wrangler/testlib/planned_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/planned_reparent_shard_test.go @@ -141,4 +141,7 @@ func TestPlannedReparentShard(t *testing.T) { if goodSlave2.FakeMysqlDaemon.Replicating { t.Errorf("goodSlave2.FakeMysqlDaemon.Replicating set") } + + checkSemiSyncEnabled(t, true, true, newMaster) + checkSemiSyncEnabled(t, false, true, goodSlave1, goodSlave2, oldMaster) } diff --git a/go/vt/wrangler/testlib/reparent_utils_test.go b/go/vt/wrangler/testlib/reparent_utils_test.go index a576ffd37a..c743a05ee7 100644 --- a/go/vt/wrangler/testlib/reparent_utils_test.go +++ b/go/vt/wrangler/testlib/reparent_utils_test.go @@ -134,4 +134,5 @@ func TestReparentTablet(t *testing.T) { if err := slave.FakeMysqlDaemon.CheckSuperQueryList(); err != nil { t.Fatalf("slave.FakeMysqlDaemon.CheckSuperQueryList failed: %v", err) } + checkSemiSyncEnabled(t, false, true, slave) } diff --git a/go/vt/wrangler/testlib/semi_sync_test.go b/go/vt/wrangler/testlib/semi_sync_test.go new file mode 100644 index 0000000000..bd9f170322 --- /dev/null +++ b/go/vt/wrangler/testlib/semi_sync_test.go @@ -0,0 +1,28 @@ +// Copyright 2016, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package testlib + +import ( + "testing" + + "github.com/youtube/vitess/go/vt/tabletmanager" + "github.com/youtube/vitess/go/vt/topo/topoproto" +) + +func init() { + // Enable semi-sync for all testlib tests. + *tabletmanager.EnableSemiSync = true +} + +func checkSemiSyncEnabled(t *testing.T, master, slave bool, tablets ...*FakeTablet) { + for _, tablet := range tablets { + if got, want := tablet.FakeMysqlDaemon.SemiSyncMasterEnabled, master; got != want { + t.Errorf("%v: SemiSyncMasterEnabled = %v, want %v", topoproto.TabletAliasString(tablet.Tablet.Alias), got, want) + } + if got, want := tablet.FakeMysqlDaemon.SemiSyncSlaveEnabled, slave; got != want { + t.Errorf("%v: SemiSyncSlaveEnabled = %v, want %v", topoproto.TabletAliasString(tablet.Tablet.Alias), got, want) + } + } +} diff --git a/go/vt/wrangler/testlib/vtctl_pipe.go b/go/vt/wrangler/testlib/vtctl_pipe.go index 64281ee4ab..f8b6add934 100644 --- a/go/vt/wrangler/testlib/vtctl_pipe.go +++ b/go/vt/wrangler/testlib/vtctl_pipe.go @@ -13,12 +13,14 @@ import ( "google.golang.org/grpc" - logutilpb "github.com/youtube/vitess/go/vt/proto/logutil" + "github.com/youtube/vitess/go/vt/logutil" "github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/vtctl/grpcvtctlserver" "github.com/youtube/vitess/go/vt/vtctl/vtctlclient" "golang.org/x/net/context" + logutilpb "github.com/youtube/vitess/go/vt/proto/logutil" + // we need to import the grpcvtctlclient library so the gRPC // vtctl client is registered and can be used. _ "github.com/youtube/vitess/go/vt/vtctl/grpcvtctlclient" @@ -80,7 +82,7 @@ func (vp *VtctlPipe) Run(args []string) error { return fmt.Errorf("VtctlPipe.Run() failed: %v", err) } for le := range c { - vp.t.Logf(le.String()) + vp.t.Logf(logutil.EventString(le)) } return errFunc() }