Merge pull request #1521 from enisoc/semisync

Support semi-sync without async fallback.
This commit is contained in:
Anthony Yeh 2016-02-25 15:54:04 -08:00
Родитель 6dc99de5bd 43b007dbb8
Коммит 4e13248a69
29 изменённых файлов: 389 добавлений и 125 удалений

Просмотреть файл

@ -63,3 +63,20 @@ transaction-isolation = REPEATABLE-READ
# READ-COMMITTED would be better, but mysql 5.1 disables this with statement based replication
# READ-UNCOMMITTED might be better
lower_case_table_names = 1
# Semi-sync replication is required for automated unplanned failover
# (when the master goes away). Here we just load the plugin so it's
# available if desired, but it's disabled at startup.
#
# If the -enable_semi_sync flag is used, VTTablet will enable semi-sync
# at the proper time when replication is set up, or when masters are
# promoted or demoted.
plugin-load = rpl_semi_sync_master=semisync_master.so;rpl_semi_sync_slave=semisync_slave.so
# When semi-sync is enabled, don't allow fallback to async
# if you get no ack, or have no slaves. This is necessary to
# prevent alternate futures when doing a failover in response to
# a master that becomes unresponsive.
rpl_semi_sync_master_timeout = 1000000000000000000
rpl_semi_sync_master_wait_no_slave = 1

Просмотреть файл

@ -76,6 +76,7 @@ spec:
-db-config-filtered-dbname vt_{{keyspace}}
-db-config-filtered-charset utf8
-enable-rowcache
-enable_semi_sync
-rowcache-bin /usr/bin/memcached
-rowcache-socket $VTDATAROOT/{{tablet_subdir}}/memcache.sock
-restore_from_backup {{backup_flags}}" vitess

Просмотреть файл

@ -94,6 +94,7 @@ for uid_index in $uids; do
-target_tablet_type $tablet_type \
-health_check_interval 5s \
-enable-rowcache \
-enable_semi_sync \
-rowcache-bin $memcached_path \
-rowcache-socket $VTDATAROOT/$tablet_dir/memcache.sock \
-backup_storage_implementation file \

Просмотреть файл

@ -214,6 +214,7 @@ func backup(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, bh b
sourceIsMaster := false
readOnly := true
var replicationPosition replication.Position
semiSyncMaster, semiSyncSlave := mysqld.SemiSyncEnabled()
// see if we need to restart replication after backup
logger.Infof("getting current replication status")
@ -225,35 +226,35 @@ func backup(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, bh b
// keep going if we're the master, might be a degenerate case
sourceIsMaster = true
default:
return fmt.Errorf("cannot get slave status: %v", err)
return fmt.Errorf("can't get slave status: %v", err)
}
// get the read-only flag
readOnly, err = mysqld.IsReadOnly()
if err != nil {
return fmt.Errorf("cannot get read only status: %v", err)
return fmt.Errorf("can't get read-only status: %v", err)
}
// get the replication position
if sourceIsMaster {
if !readOnly {
logger.Infof("turning master read-onyl before backup")
logger.Infof("turning master read-only before backup")
if err = mysqld.SetReadOnly(true); err != nil {
return fmt.Errorf("cannot get read only status: %v", err)
return fmt.Errorf("can't set read-only status: %v", err)
}
}
replicationPosition, err = mysqld.MasterPosition()
if err != nil {
return fmt.Errorf("cannot get master position: %v", err)
return fmt.Errorf("can't get master position: %v", err)
}
} else {
if err = StopSlave(mysqld, hookExtraEnv); err != nil {
return fmt.Errorf("cannot stop slave: %v", err)
return fmt.Errorf("can't stop slave: %v", err)
}
var slaveStatus replication.Status
slaveStatus, err = mysqld.SlaveStatus()
if err != nil {
return fmt.Errorf("cannot get slave status: %v", err)
return fmt.Errorf("can't get slave status: %v", err)
}
replicationPosition = slaveStatus.Position
}
@ -262,28 +263,38 @@ func backup(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, bh b
// shutdown mysqld
err = mysqld.Shutdown(ctx, true)
if err != nil {
return fmt.Errorf("cannot shutdown mysqld: %v", err)
return fmt.Errorf("can't shutdown mysqld: %v", err)
}
// get the files to backup
fes, err := findFilesTobackup(mysqld.Cnf())
if err != nil {
return fmt.Errorf("cannot find files to backup: %v", err)
return fmt.Errorf("can't find files to backup: %v", err)
}
logger.Infof("found %v files to backup", len(fes))
// backup everything
if err := backupFiles(mysqld, logger, bh, fes, replicationPosition, backupConcurrency); err != nil {
return fmt.Errorf("cannot backup files: %v", err)
return fmt.Errorf("can't backup files: %v", err)
}
// Try to restart mysqld
err = mysqld.Start(ctx)
if err != nil {
return fmt.Errorf("cannot restart mysqld: %v", err)
return fmt.Errorf("can't restart mysqld: %v", err)
}
// Restore original mysqld state that we saved above.
if semiSyncMaster || semiSyncSlave {
// Only do this if one of them was on, since both being off could mean
// the plugin isn't even loaded, and the server variables don't exist.
logger.Infof("restoring semi-sync settings from before backup: master=%v, slave=%v",
semiSyncMaster, semiSyncSlave)
err := mysqld.SetSemiSyncEnabled(semiSyncMaster, semiSyncSlave)
if err != nil {
return err
}
}
if slaveStartRequired {
logger.Infof("restarting mysql replication")
if err := StartSlave(mysqld, hookExtraEnv); err != nil {

Просмотреть файл

@ -38,6 +38,8 @@ type MysqlDaemon interface {
// replication related methods
SlaveStatus() (replication.Status, error)
SetSemiSyncEnabled(master, slave bool) error
SemiSyncEnabled() (master, slave bool)
// reparenting related methods
ResetReplicationCommands() ([]string, error)
@ -192,6 +194,11 @@ type FakeMysqlDaemon struct {
// BinlogPlayerEnabled is used by {Enable,Disable}BinlogPlayer
BinlogPlayerEnabled bool
// SemiSyncMasterEnabled represents the state of rpl_semi_sync_master_enabled.
SemiSyncMasterEnabled bool
// SemiSyncSlaveEnabled represents the state of rpl_semi_sync_slave_enabled.
SemiSyncSlaveEnabled bool
}
// NewFakeMysqlDaemon returns a FakeMysqlDaemon where mysqld appears
@ -438,3 +445,15 @@ func (fmd *FakeMysqlDaemon) GetAppConnection() (dbconnpool.PoolConnection, error
func (fmd *FakeMysqlDaemon) GetDbaConnection() (*dbconnpool.DBConnection, error) {
return dbconnpool.NewDBConnection(&sqldb.ConnParams{Engine: fmd.db.Name}, stats.NewTimings(""))
}
// SetSemiSyncEnabled is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) SetSemiSyncEnabled(master, slave bool) error {
fmd.SemiSyncMasterEnabled = master
fmd.SemiSyncSlaveEnabled = slave
return nil
}
// SemiSyncEnabled is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) SemiSyncEnabled() (master, slave bool) {
return fmd.SemiSyncMasterEnabled, fmd.SemiSyncSlaveEnabled
}

Просмотреть файл

@ -7,11 +7,34 @@ package mysqlctl
import (
"fmt"
"strings"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/dbconnpool"
)
// getPoolReconnect gets a connection from a pool, tests it, and reconnects if
// it gets errno 2006.
func getPoolReconnect(pool *dbconnpool.ConnectionPool, timeout time.Duration) (dbconnpool.PoolConnection, error) {
conn, err := pool.Get(timeout)
if err != nil {
return conn, err
}
// Run a test query to see if this connection is still good.
if _, err := conn.ExecuteFetch("SELECT 1", 1, false); err != nil {
// If we get "MySQL server has gone away (errno 2006)", try to reconnect.
if sqlErr, ok := err.(*sqldb.SQLError); ok && sqlErr.Number() == 2006 {
if err := conn.Reconnect(); err != nil {
conn.Recycle()
return conn, err
}
}
}
return conn, nil
}
// ExecuteSuperQuery allows the user to execute a query as a super user.
func (mysqld *Mysqld) ExecuteSuperQuery(query string) error {
return mysqld.ExecuteSuperQueryList([]string{query})
@ -19,7 +42,7 @@ func (mysqld *Mysqld) ExecuteSuperQuery(query string) error {
// ExecuteSuperQueryList alows the user to execute queries as a super user.
func (mysqld *Mysqld) ExecuteSuperQueryList(queryList []string) error {
conn, connErr := mysqld.dbaPool.Get(0)
conn, connErr := getPoolReconnect(mysqld.dbaPool, 0)
if connErr != nil {
return connErr
}
@ -35,7 +58,7 @@ func (mysqld *Mysqld) ExecuteSuperQueryList(queryList []string) error {
// FetchSuperQuery returns the results of executing a query as a super user.
func (mysqld *Mysqld) FetchSuperQuery(query string) (*sqltypes.Result, error) {
conn, connErr := mysqld.dbaPool.Get(0)
conn, connErr := getPoolReconnect(mysqld.dbaPool, 0)
if connErr != nil {
return nil, connErr
}
@ -62,13 +85,31 @@ func (mysqld *Mysqld) fetchSuperQueryMap(query string) (map[string]string, error
return nil, fmt.Errorf("query %#v returned %d column names, expected %d", query, len(qr.Fields), len(qr.Rows[0]))
}
rowMap := make(map[string]string)
rowMap := make(map[string]string, len(qr.Rows[0]))
for i, value := range qr.Rows[0] {
rowMap[qr.Fields[i].Name] = value.String()
}
return rowMap, nil
}
// fetchVariables returns a map from MySQL variable names to variable value
// for variables that match the given pattern.
func (mysqld *Mysqld) fetchVariables(pattern string) (map[string]string, error) {
query := fmt.Sprintf("SHOW VARIABLES LIKE '%s'", pattern)
qr, err := mysqld.FetchSuperQuery(query)
if err != nil {
return nil, err
}
if len(qr.Fields) != 2 {
return nil, fmt.Errorf("query %#v returned %d columns, expected 2", query, len(qr.Fields))
}
varMap := make(map[string]string, len(qr.Rows))
for _, row := range qr.Rows {
varMap[row[0].String()] = row[1].String()
}
return varMap, nil
}
const masterPasswordStart = " MASTER_PASSWORD = '"
const masterPasswordEnd = "',\n"

Просмотреть файл

@ -346,3 +346,43 @@ func (mysqld *Mysqld) DisableBinlogPlayback() error {
}
return flavor.DisableBinlogPlayback(mysqld)
}
// SetSemiSyncEnabled enables or disables semi-sync replication for
// master and/or slave mode.
func (mysqld *Mysqld) SetSemiSyncEnabled(master, slave bool) error {
log.Infof("Setting semi-sync mode: master=%v, slave=%v", master, slave)
// Convert bool to int.
var m, s int
if master {
m = 1
}
if slave {
s = 1
}
err := mysqld.ExecuteSuperQuery(
fmt.Sprintf(
"SET GLOBAL rpl_semi_sync_master_enabled = %v, GLOBAL rpl_semi_sync_slave_enabled = %v",
m, s))
if err != nil {
return fmt.Errorf("can't set semi-sync mode: %v; make sure plugins are loaded in my.cnf", err)
}
return nil
}
// SemiSyncEnabled returns whether semi-sync is enabled for master or slave.
// If the semi-sync plugin is not loaded, we assume semi-sync is disabled.
func (mysqld *Mysqld) SemiSyncEnabled() (master, slave bool) {
vars, err := mysqld.fetchVariables("rpl_semi_sync_%_enabled")
if err != nil {
return false, false
}
if mval, mok := vars["rpl_semi_sync_master_enabled"]; mok {
master = (mval == "ON")
}
if sval, sok := vars["rpl_semi_sync_slave_enabled"]; sok {
slave = (sval == "ON")
}
return master, slave
}

Просмотреть файл

@ -101,6 +101,13 @@ func (agent *ActionAgent) startReplication(ctx context.Context, pos replication.
return fmt.Errorf("Cannot read master tablet %v: %v", si.MasterAlias, err)
}
// If using semi-sync, we need to enable it before connecting to master.
if *EnableSemiSync {
if err := agent.enableSemiSync(false); err != nil {
return err
}
}
// Set master and start slave.
cmds, err = agent.MysqlDaemon.SetMasterCommands(ti.Hostname, int(ti.PortMap["mysql"]))
if err != nil {

Просмотреть файл

@ -5,6 +5,7 @@
package tabletmanager
import (
"flag"
"fmt"
"time"
@ -20,6 +21,11 @@ import (
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
var (
// EnableSemiSync is exported for use by vt/wrangler/testlib.
EnableSemiSync = flag.Bool("enable_semi_sync", false, "Enable semi-sync when configuring replication, on master and replica tablets only (rdonly tablets will not ack).")
)
// SlaveStatus returns the replication status
// Should be called under RPCWrap.
func (agent *ActionAgent) SlaveStatus(ctx context.Context) (*replicationdatapb.Status, error) {
@ -72,6 +78,11 @@ func (agent *ActionAgent) StopSlaveMinimum(ctx context.Context, position string,
// replication or not (using hook if not).
// Should be called under RPCWrapLock.
func (agent *ActionAgent) StartSlave(ctx context.Context) error {
if *EnableSemiSync {
if err := agent.enableSemiSync(false); err != nil {
return err
}
}
return mysqlctl.StartSlave(agent.MysqlDaemon, agent.hookExtraEnv())
}
@ -107,10 +118,16 @@ func (agent *ActionAgent) InitMaster(ctx context.Context) (string, error) {
return "", err
}
// If using semi-sync, we need to enable it before going read-write.
if *EnableSemiSync {
if err := agent.enableSemiSync(true); err != nil {
return "", err
}
}
// Set the server read-write, from now on we can accept real
// client writes. Note that if semi-sync replication is enabled,
// we'll still need some slaves to be able to commit
// transactions.
// we'll still need some slaves to be able to commit transactions.
if err := agent.MysqlDaemon.SetReadOnly(false); err != nil {
return "", err
}
@ -152,6 +169,13 @@ func (agent *ActionAgent) InitSlave(ctx context.Context, parent *topodatapb.Tabl
return err
}
// If using semi-sync, we need to enable it before connecting to master.
if *EnableSemiSync {
if err := agent.enableSemiSync(false); err != nil {
return err
}
}
cmds, err := agent.MysqlDaemon.SetSlavePositionCommands(pos)
if err != nil {
return err
@ -189,6 +213,13 @@ func (agent *ActionAgent) DemoteMaster(ctx context.Context) (string, error) {
return "", fmt.Errorf("disallowQueries failed: %v", err)
}
// If using semi-sync, we need to disable master-side.
if *EnableSemiSync {
if err := agent.enableSemiSync(false); err != nil {
return "", err
}
}
pos, err := agent.MysqlDaemon.DemoteMaster()
if err != nil {
return "", err
@ -227,6 +258,13 @@ func (agent *ActionAgent) PromoteSlaveWhenCaughtUp(ctx context.Context, position
return "", err
}
// If using semi-sync, we need to enable it before going read-write.
if *EnableSemiSync {
if err := agent.enableSemiSync(true); err != nil {
return "", err
}
}
if err := agent.MysqlDaemon.SetReadOnly(false); err != nil {
return "", err
}
@ -265,6 +303,13 @@ func (agent *ActionAgent) SetMaster(ctx context.Context, parentAlias *topodatapb
shouldbeReplicating = true
}
// If using semi-sync, we need to enable it before connecting to master.
if *EnableSemiSync {
if err := agent.enableSemiSync(false); err != nil {
return err
}
}
// Create the list of commands to set the master
cmds := []string{}
if wasReplicating {
@ -347,6 +392,13 @@ func (agent *ActionAgent) PromoteSlave(ctx context.Context) (string, error) {
return "", err
}
// If using semi-sync, we need to enable it before going read-write.
if *EnableSemiSync {
if err := agent.enableSemiSync(true); err != nil {
return "", err
}
}
// Set the server read-write
if err := agent.MysqlDaemon.SetReadOnly(false); err != nil {
return "", err
@ -358,3 +410,37 @@ func (agent *ActionAgent) PromoteSlave(ctx context.Context) (string, error) {
return replication.EncodePosition(pos), nil
}
func (agent *ActionAgent) isMasterEligible() (bool, error) {
switch agent.Tablet().Type {
case topodatapb.TabletType_MASTER, topodatapb.TabletType_REPLICA:
return true, nil
case topodatapb.TabletType_SPARE:
// If we're SPARE, it could be because healthcheck is enabled.
tt, err := topoproto.ParseTabletType(*targetTabletType)
if err != nil {
return false, fmt.Errorf("can't determine if tablet is master-eligible: currently SPARE and no -target_tablet_type flag specified")
}
if tt == topodatapb.TabletType_REPLICA {
return true, nil
}
}
return false, nil
}
func (agent *ActionAgent) enableSemiSync(master bool) error {
// Only enable if we're eligible for becoming master (REPLICA type).
// Ineligible slaves (RDONLY) shouldn't ACK because we'll never promote them.
masterEligible, err := agent.isMasterEligible()
if err != nil {
return fmt.Errorf("can't enable semi-sync: %v", err)
}
if !masterEligible {
return nil
}
// Always enable slave-side since it doesn't hurt to keep it on for a master.
// The master-side needs to be off for a slave, or else it will get stuck.
return agent.MysqlDaemon.SetSemiSyncEnabled(master, true)
}

Просмотреть файл

@ -138,6 +138,8 @@ func TestEmergencyReparentShard(t *testing.T) {
if goodSlave2.FakeMysqlDaemon.Replicating {
t.Errorf("goodSlave2.FakeMysqlDaemon.Replicating set")
}
checkSemiSyncEnabled(t, true, true, newMaster)
checkSemiSyncEnabled(t, false, true, goodSlave1, goodSlave2)
}
// TestEmergencyReparentShardMasterElectNotBest tries to emergency reparent

Просмотреть файл

@ -122,6 +122,8 @@ func TestInitMasterShard(t *testing.T) {
if err := goodSlave2.FakeMysqlDaemon.CheckSuperQueryList(); err != nil {
t.Fatalf("goodSlave2.FakeMysqlDaemon.CheckSuperQueryList failed: %v", err)
}
checkSemiSyncEnabled(t, true, true, master)
checkSemiSyncEnabled(t, false, true, goodSlave1, goodSlave2)
}
// TestInitMasterShardChecks makes sure the safety checks work

Просмотреть файл

@ -141,4 +141,7 @@ func TestPlannedReparentShard(t *testing.T) {
if goodSlave2.FakeMysqlDaemon.Replicating {
t.Errorf("goodSlave2.FakeMysqlDaemon.Replicating set")
}
checkSemiSyncEnabled(t, true, true, newMaster)
checkSemiSyncEnabled(t, false, true, goodSlave1, goodSlave2, oldMaster)
}

Просмотреть файл

@ -134,4 +134,5 @@ func TestReparentTablet(t *testing.T) {
if err := slave.FakeMysqlDaemon.CheckSuperQueryList(); err != nil {
t.Fatalf("slave.FakeMysqlDaemon.CheckSuperQueryList failed: %v", err)
}
checkSemiSyncEnabled(t, false, true, slave)
}

Просмотреть файл

@ -0,0 +1,28 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package testlib
import (
"testing"
"github.com/youtube/vitess/go/vt/tabletmanager"
"github.com/youtube/vitess/go/vt/topo/topoproto"
)
func init() {
// Enable semi-sync for all testlib tests.
*tabletmanager.EnableSemiSync = true
}
func checkSemiSyncEnabled(t *testing.T, master, slave bool, tablets ...*FakeTablet) {
for _, tablet := range tablets {
if got, want := tablet.FakeMysqlDaemon.SemiSyncMasterEnabled, master; got != want {
t.Errorf("%v: SemiSyncMasterEnabled = %v, want %v", topoproto.TabletAliasString(tablet.Tablet.Alias), got, want)
}
if got, want := tablet.FakeMysqlDaemon.SemiSyncSlaveEnabled, slave; got != want {
t.Errorf("%v: SemiSyncSlaveEnabled = %v, want %v", topoproto.TabletAliasString(tablet.Tablet.Alias), got, want)
}
}
}

Просмотреть файл

@ -13,12 +13,14 @@ import (
"google.golang.org/grpc"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vtctl/grpcvtctlserver"
"github.com/youtube/vitess/go/vt/vtctl/vtctlclient"
"golang.org/x/net/context"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
// we need to import the grpcvtctlclient library so the gRPC
// vtctl client is registered and can be used.
_ "github.com/youtube/vitess/go/vt/vtctl/grpcvtctlclient"
@ -80,7 +82,7 @@ func (vp *VtctlPipe) Run(args []string) error {
return fmt.Errorf("VtctlPipe.Run() failed: %v", err)
}
for le := range c {
vp.t.Logf(le.String())
vp.t.Logf(logutil.EventString(le))
}
return errFunc()
}

Просмотреть файл

@ -74,6 +74,7 @@ class TestBackup(unittest.TestCase):
environment.topo_server().wipe()
for t in [tablet_master, tablet_replica1, tablet_replica2]:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
_create_vt_insert_test = '''create table vt_insert_test (

Просмотреть файл

@ -15,24 +15,24 @@ import environment
import tablet
import utils
# shards
# shards need at least 1 replica for semi-sync ACK, and 1 rdonly for SplitQuery.
shard_0_master = tablet.Tablet()
shard_0_replica = tablet.Tablet()
shard_0_rdonly = tablet.Tablet()
shard_1_master = tablet.Tablet()
shard_1_replica = tablet.Tablet()
shard_1_rdonly = tablet.Tablet()
all_tablets = [shard_0_master, shard_0_replica, shard_0_rdonly,
shard_1_master, shard_1_replica, shard_1_rdonly]
def setUpModule():
try:
environment.topo_server().setup()
setup_procs = [
shard_0_master.init_mysql(),
shard_0_rdonly.init_mysql(),
shard_1_master.init_mysql(),
shard_1_rdonly.init_mysql(),
]
setup_procs = [t.init_mysql() for t in all_tablets]
utils.Vtctld().start()
utils.VtGate().start()
utils.wait_procs(setup_procs)
@ -46,22 +46,15 @@ def tearDownModule():
if utils.options.skip_teardown:
return
teardown_procs = [
shard_0_master.teardown_mysql(),
shard_0_rdonly.teardown_mysql(),
shard_1_master.teardown_mysql(),
shard_1_rdonly.teardown_mysql(),
]
teardown_procs = [t.teardown_mysql() for t in all_tablets]
utils.wait_procs(teardown_procs, raise_on_error=False)
environment.topo_server().teardown()
utils.kill_sub_processes()
utils.remove_tmp_files()
shard_0_master.remove_tree()
shard_0_rdonly.remove_tree()
shard_1_master.remove_tree()
shard_1_rdonly.remove_tree()
for t in all_tablets:
t.remove_tree()
class TestCustomSharding(unittest.TestCase):
@ -118,11 +111,12 @@ class TestCustomSharding(unittest.TestCase):
# start the first shard only for now
shard_0_master.init_tablet('master', 'test_keyspace', '0')
shard_0_replica.init_tablet('replica', 'test_keyspace', '0')
shard_0_rdonly.init_tablet('rdonly', 'test_keyspace', '0')
for t in [shard_0_master, shard_0_rdonly]:
for t in [shard_0_master, shard_0_replica, shard_0_rdonly]:
t.create_db('vt_test_keyspace')
t.start_vttablet(wait_for_state=None)
for t in [shard_0_master, shard_0_rdonly]:
for t in [shard_0_master, shard_0_replica, shard_0_rdonly]:
t.wait_for_vttablet_state('SERVING')
utils.run_vtctl(['InitShardMaster', 'test_keyspace/0',
@ -143,7 +137,7 @@ primary key (id)
auto_log=True)
# reload schema everywhere so the QueryService knows about the tables
for t in [shard_0_master, shard_0_rdonly]:
for t in [shard_0_master, shard_0_replica, shard_0_rdonly]:
utils.run_vtctl(['ReloadSchema', t.tablet_alias], auto_log=True)
# insert data on shard 0
@ -154,10 +148,11 @@ primary key (id)
# create shard 1
shard_1_master.init_tablet('master', 'test_keyspace', '1')
shard_1_replica.init_tablet('replica', 'test_keyspace', '1')
shard_1_rdonly.init_tablet('rdonly', 'test_keyspace', '1')
for t in [shard_1_master, shard_1_rdonly]:
for t in [shard_1_master, shard_1_replica, shard_1_rdonly]:
t.start_vttablet(wait_for_state=None)
for t in [shard_1_master, shard_1_rdonly]:
for t in [shard_1_master, shard_1_replica, shard_1_rdonly]:
t.wait_for_vttablet_state('NOT_SERVING')
s = utils.run_vtctl_json(['GetShard', 'test_keyspace/1'])
self.assertEqual(len(s['served_types']), 3)
@ -166,7 +161,7 @@ primary key (id)
shard_1_master.tablet_alias], auto_log=True)
utils.run_vtctl(['CopySchemaShard', shard_0_rdonly.tablet_alias,
'test_keyspace/1'], auto_log=True)
for t in [shard_1_master, shard_1_rdonly]:
for t in [shard_1_master, shard_1_replica, shard_1_rdonly]:
utils.run_vtctl(['RefreshState', t.tablet_alias], auto_log=True)
t.wait_for_vttablet_state('SERVING')
@ -189,7 +184,7 @@ primary key (id)
auto_log=True)
# reload schema everywhere so the QueryService knows about the tables
for t in [shard_0_master, shard_0_rdonly, shard_1_master, shard_1_rdonly]:
for t in all_tablets:
utils.run_vtctl(['ReloadSchema', t.tablet_alias], auto_log=True)
# insert and read data on all shards
@ -240,7 +235,8 @@ primary key (id)
def _check_shards_count_in_srv_keyspace(self, shard_count):
ks = utils.run_vtctl_json(['GetSrvKeyspace', 'test_nj', 'test_keyspace'])
check_types = set([topodata_pb2.MASTER, topodata_pb2.RDONLY])
check_types = set([topodata_pb2.MASTER, topodata_pb2.REPLICA,
topodata_pb2.RDONLY])
for p in ks['partitions']:
if p['served_type'] in check_types:
self.assertEqual(len(p['shard_references']), shard_count)

Просмотреть файл

@ -21,7 +21,12 @@ class TestEnv(object):
self.tablet_map = {}
def launch(
self, keyspace, shards=None, replica_count=0, rdonly_count=0, ddls=None):
self, keyspace, shards=None, replica_count=1, rdonly_count=0, ddls=None):
"""Launch test environment."""
if replica_count < 1:
raise Exception('replica_count=%d < 1; tests now use semi-sync'
' and must have at least one replica' % replica_count)
self.tablets = []
utils.run_vtctl(['CreateKeyspace', keyspace])
if not shards or shards[0] == '0':
@ -52,8 +57,6 @@ class TestEnv(object):
if t.tablet_type == 'master':
utils.run_vtctl(['InitShardMaster', keyspace+'/'+t.shard,
t.tablet_alias], auto_log=True)
# Force read-write even if there are no replicas.
utils.run_vtctl(['SetReadWrite', t.tablet_alias], auto_log=True)
for ddl in ddls:
fname = os.path.join(environment.tmproot, 'ddl.sql')
@ -70,6 +73,8 @@ class TestEnv(object):
t.remove_tree()
def _start_tablet(self, keyspace, shard, tablet_type, index):
"""Start a tablet."""
t = tablet.Tablet()
self.tablets.append(t)
if tablet_type == 'master':

Просмотреть файл

@ -1,4 +1,5 @@
#!/usr/bin/env python
"""Define abstractions for various MySQL flavors."""
import environment
import logging
@ -28,6 +29,15 @@ class MysqlFlavor(object):
def change_master_commands(self, host, port, pos):
raise NotImplementedError()
def set_semi_sync_enabled_commands(self, master=None, slave=None):
"""Returns commands to turn semi-sync on/off."""
cmds = []
if master is not None:
cmds.append("SET GLOBAL rpl_semi_sync_master_enabled = %d" % master)
if slave is not None:
cmds.append("SET GLOBAL rpl_semi_sync_slave_enabled = %d" % master)
return cmds
def extra_my_cnf(self):
"""Returns the path to an extra my_cnf file, or None."""
return None
@ -157,6 +167,11 @@ def mysql_flavor():
def set_mysql_flavor(flavor):
"""Set the object that will be returned by mysql_flavor().
If flavor is not specified, set it based on MYSQL_FLAVOR environment variable.
"""
global __mysql_flavor
if not flavor:

Просмотреть файл

@ -64,6 +64,7 @@ class TestMysqlctl(unittest.TestCase):
tablet.Tablet.check_vttablet_count()
for t in [master_tablet, replica_tablet]:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
def test_mysqlctl_restart(self):

Просмотреть файл

@ -67,6 +67,7 @@ class TestReparent(unittest.TestCase):
environment.topo_server().wipe()
for t in [tablet_62344, tablet_62044, tablet_41983, tablet_31981]:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
super(TestReparent, self).tearDown()
@ -320,13 +321,7 @@ class TestReparent(unittest.TestCase):
self._check_master_cell('test_nj', shard_id, 'test_nj')
self._check_master_cell('test_ny', shard_id, 'test_nj')
# Convert two replica to spare. That should leave only one node
# serving traffic, but still needs to appear in the replication
# graph.
utils.run_vtctl(['ChangeSlaveType', tablet_41983.tablet_alias, 'spare'])
utils.run_vtctl(['ChangeSlaveType', tablet_31981.tablet_alias, 'spare'])
utils.validate_topology()
self._check_db_addr(shard_id, 'replica', tablet_62044.port)
# Run this to make sure it succeeds.
utils.run_vtctl(['ShardReplicationPositions', 'test_keyspace/' + shard_id],
@ -569,13 +564,12 @@ class TestReparent(unittest.TestCase):
wait_for_start=False)
tablet_31981.init_tablet('replica', 'test_keyspace', shard_id, start=True,
wait_for_start=False)
tablet_41983.init_tablet('spare', 'test_keyspace', shard_id, start=True,
tablet_41983.init_tablet('replica', 'test_keyspace', shard_id, start=True,
wait_for_start=False)
# wait for all tablets to start
for t in [tablet_62344, tablet_62044, tablet_31981]:
for t in [tablet_62344, tablet_62044, tablet_31981, tablet_41983]:
t.wait_for_vttablet_state('SERVING')
tablet_41983.wait_for_vttablet_state('NOT_SERVING')
# Recompute the shard layout node - until you do that, it might not be
# valid.

Просмотреть файл

@ -472,7 +472,7 @@ primary key (name)
shard_0_ny_rdonly.init_tablet('rdonly', 'test_keyspace', '-80')
shard_1_master.init_tablet('master', 'test_keyspace', '80-')
shard_1_slave1.init_tablet('replica', 'test_keyspace', '80-')
shard_1_slave2.init_tablet('spare', 'test_keyspace', '80-')
shard_1_slave2.init_tablet('replica', 'test_keyspace', '80-')
shard_1_ny_rdonly.init_tablet('rdonly', 'test_keyspace', '80-')
shard_1_rdonly1.init_tablet('rdonly', 'test_keyspace', '80-')
@ -497,7 +497,7 @@ primary key (name)
shard_0_ny_rdonly.wait_for_vttablet_state('SERVING')
shard_1_master.wait_for_vttablet_state('SERVING')
shard_1_slave1.wait_for_vttablet_state('SERVING')
shard_1_slave2.wait_for_vttablet_state('NOT_SERVING') # spare
shard_1_slave2.wait_for_vttablet_state('SERVING')
shard_1_ny_rdonly.wait_for_vttablet_state('SERVING')
shard_1_rdonly1.wait_for_vttablet_state('SERVING')
@ -521,10 +521,10 @@ primary key (name)
# create the split shards
shard_2_master.init_tablet('master', 'test_keyspace', '80-c0')
shard_2_replica1.init_tablet('spare', 'test_keyspace', '80-c0')
shard_2_replica2.init_tablet('spare', 'test_keyspace', '80-c0')
shard_2_replica1.init_tablet('replica', 'test_keyspace', '80-c0')
shard_2_replica2.init_tablet('replica', 'test_keyspace', '80-c0')
shard_3_master.init_tablet('master', 'test_keyspace', 'c0-')
shard_3_replica.init_tablet('spare', 'test_keyspace', 'c0-')
shard_3_replica.init_tablet('replica', 'test_keyspace', 'c0-')
shard_3_rdonly1.init_tablet('rdonly', 'test_keyspace', 'c0-')
# start vttablet on the split shards (no db created,

Просмотреть файл

@ -19,6 +19,11 @@ warnings.simplefilter('ignore')
master_tablet = tablet.Tablet()
replica_tablet = tablet.Tablet()
# Second replica to provide semi-sync ACKs while testing
# scenarios when the first replica is down.
replica2_tablet = tablet.Tablet()
all_tablets = [master_tablet, replica_tablet, replica2_tablet]
create_vt_insert_test = '''create table vt_insert_test (
id bigint auto_increment,
@ -32,9 +37,7 @@ def setUpModule():
environment.topo_server().setup()
# start mysql instance external to the test
setup_procs = [master_tablet.init_mysql(),
replica_tablet.init_mysql()]
utils.wait_procs(setup_procs)
utils.wait_procs([t.init_mysql() for t in all_tablets])
# start a vtctld so the vtctl insert commands are just RPCs, not forks
utils.Vtctld().start()
@ -44,15 +47,16 @@ def setUpModule():
utils.run_vtctl(['CreateKeyspace', 'test_keyspace'])
master_tablet.init_tablet('master', 'test_keyspace', '0')
replica_tablet.init_tablet('replica', 'test_keyspace', '0')
replica2_tablet.init_tablet('replica', 'test_keyspace', '0')
utils.validate_topology()
master_tablet.populate('vt_test_keyspace', create_vt_insert_test)
replica_tablet.populate('vt_test_keyspace', create_vt_insert_test)
for t in all_tablets:
t.populate('vt_test_keyspace', create_vt_insert_test)
master_tablet.start_vttablet(memcache=True, wait_for_state=None)
replica_tablet.start_vttablet(memcache=True, wait_for_state=None)
master_tablet.wait_for_vttablet_state('SERVING')
replica_tablet.wait_for_vttablet_state('SERVING')
for t in all_tablets:
t.start_vttablet(memcache=True, wait_for_state=None)
for t in all_tablets:
t.wait_for_vttablet_state('SERVING')
utils.run_vtctl(['InitShardMaster', 'test_keyspace/0',
master_tablet.tablet_alias], auto_log=True)
@ -71,16 +75,15 @@ def tearDownModule():
if utils.options.skip_teardown:
return
logging.debug('Tearing down the servers and setup')
tablet.kill_tablets([master_tablet, replica_tablet])
teardown_procs = [master_tablet.teardown_mysql(),
replica_tablet.teardown_mysql()]
utils.wait_procs(teardown_procs, raise_on_error=False)
tablet.kill_tablets(all_tablets)
utils.wait_procs([t.teardown_mysql() for t in all_tablets],
raise_on_error=False)
environment.topo_server().teardown()
utils.kill_sub_processes()
utils.remove_tmp_files()
master_tablet.remove_tree()
replica_tablet.remove_tree()
for t in all_tablets:
t.remove_tree()
class MultiDict(dict):

Просмотреть файл

@ -118,6 +118,8 @@ def _teardown_shard_2():
['DeleteShard', '-recursive', 'test_keyspace/2'], auto_log=True)
for t in shard_2_tablets:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()

Просмотреть файл

@ -266,6 +266,12 @@ class Tablet(object):
def reset_replication(self):
self.mquery('', mysql_flavor().reset_replication_commands())
def set_semi_sync_enabled(self, master=None, slave=None):
logging.debug('mysql(%s): setting semi-sync mode: master=%s, slave=%s',
self.tablet_uid, master, slave)
self.mquery('',
mysql_flavor().set_semi_sync_enabled_commands(master, slave))
def populate(self, dbname, create_sql, insert_sqls=None):
self.create_db(dbname)
if isinstance(create_sql, basestring):
@ -342,6 +348,8 @@ class Tablet(object):
tablet_index=None,
start=False, dbname=None, parent=True, wait_for_start=True,
include_mysql_port=True, **kwargs):
"""Initialize a tablet's record in topology."""
self.tablet_type = tablet_type
self.keyspace = keyspace
self.shard = shard
@ -399,7 +407,7 @@ class Tablet(object):
extra_args=None, extra_env=None, include_mysql_port=True,
init_tablet_type=None, init_keyspace=None,
init_shard=None, init_db_name_override=None,
supports_backups=False, grace_period='1s'):
supports_backups=False, grace_period='1s', enable_semi_sync=True):
"""Starts a vttablet process, and returns it.
The process is also saved in self.proc, so it's easy to kill as well.
@ -422,6 +430,8 @@ class Tablet(object):
args.extend(['-binlog_player_healthcheck_retry_delay', '1s'])
args.extend(['-binlog_player_retry_delay', '1s'])
args.extend(['-pid_file', os.path.join(self.tablet_dir, 'vttablet.pid')])
if enable_semi_sync:
args.append('-enable_semi_sync')
if self.use_mysqlctld:
args.extend(
['-mysqlctl_socket', os.path.join(self.tablet_dir, 'mysqlctl.sock')])

Просмотреть файл

@ -66,6 +66,7 @@ class TestTabletManager(unittest.TestCase):
environment.topo_server().wipe()
for t in [tablet_62344, tablet_62044]:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
def _check_srv_shard(self):

Просмотреть файл

@ -15,13 +15,11 @@ import utils
# range '' - 80
shard_0_master = tablet.Tablet()
shard_0_replica = tablet.Tablet()
shard_0_spare = tablet.Tablet()
# range 80 - ''
shard_1_master = tablet.Tablet()
shard_1_replica = tablet.Tablet()
# all tablets
tablets = [shard_0_master, shard_0_replica, shard_1_master, shard_1_replica,
shard_0_spare]
tablets = [shard_0_master, shard_0_replica, shard_1_master, shard_1_replica]
def setUpModule():
@ -67,8 +65,7 @@ class TestVtctld(unittest.TestCase):
'redirected_keyspace'])
shard_0_master.init_tablet('master', 'test_keyspace', '-80')
shard_0_replica.init_tablet('spare', 'test_keyspace', '-80')
shard_0_spare.init_tablet('spare', 'test_keyspace', '-80')
shard_0_replica.init_tablet('replica', 'test_keyspace', '-80')
shard_1_master.init_tablet('master', 'test_keyspace', '80-')
shard_1_replica.init_tablet('replica', 'test_keyspace', '80-')
@ -86,18 +83,11 @@ class TestVtctld(unittest.TestCase):
target_tablet_type='replica',
wait_for_state=None)
shard_0_spare.start_vttablet(wait_for_state=None,
extra_args=utils.vtctld.process_args())
# wait for the right states
for t in [shard_0_master, shard_1_master, shard_1_replica]:
t.wait_for_vttablet_state('SERVING')
for t in [shard_0_replica, shard_0_spare]:
t.wait_for_vttablet_state('NOT_SERVING')
shard_0_replica.wait_for_vttablet_state('NOT_SERVING')
for t in [shard_0_master, shard_0_replica, shard_0_spare,
shard_1_master, shard_1_replica]:
t.reset_replication()
utils.run_vtctl(['InitShardMaster', 'test_keyspace/-80',
shard_0_master.tablet_alias], auto_log=True)
utils.run_vtctl(['InitShardMaster', 'test_keyspace/80-',

Просмотреть файл

@ -829,8 +829,6 @@ class TestFailures(BaseTestCase):
self.master_tablet = shard_1_master
self.master_tablet.kill_vttablet()
self.tablet_start(self.master_tablet, 'replica')
utils.run_vtctl(['InitShardMaster', KEYSPACE_NAME+'/-80',
shard_0_master.tablet_alias], auto_log=True)
self.master_tablet.wait_for_vttablet_state('SERVING')
self.replica_tablet = shard_1_replica1
self.replica_tablet.kill_vttablet()

Просмотреть файл

@ -384,6 +384,7 @@ class TestBaseSplitClone(unittest.TestCase):
for shard_tablet in [all_shard_tablets, shard_0_tablets, shard_1_tablets]:
for t in shard_tablet.all_tablets:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
t.kill_vttablet()
# we allow failures here as some tablets will be gone sometimes
@ -420,12 +421,18 @@ class TestBaseSplitCloneResiliency(TestBaseSplitClone):
6. Verify that the data was copied successfully to both new shards
Args:
mysql_down: boolean, True iff we expect the MySQL instances on the
destination masters to be down.
mysql_down: boolean. If True, we take down the MySQL instances on the
destination masters at first, then bring them back and reparent away.
Raises:
AssertionError if things didn't go as expected.
"""
if mysql_down:
logging.debug('Shutting down mysqld on destination masters.')
utils.wait_procs(
[shard_0_master.shutdown_mysql(),
shard_1_master.shutdown_mysql()])
worker_proc, worker_port, worker_rpc_port = utils.run_vtworker_bg(
['--cell', 'test_nj'],
auto_log=True)
@ -450,12 +457,21 @@ class TestBaseSplitCloneResiliency(TestBaseSplitClone):
"expected vtworker to retry, but it didn't")
logging.debug('Worker has resolved at least twice, starting reparent now')
# Original masters have no running MySQL, so need to force the reparent.
# Bring back masters. Since we test with semi-sync now, we need at least
# one replica for the new master. This test is already quite expensive,
# so we bring back the old master as a replica rather than having a third
# replica up the whole time.
logging.debug('Restarting mysqld on destination masters')
utils.wait_procs(
[shard_0_master.start_mysql(),
shard_1_master.start_mysql()])
# Reparent away from the old masters.
utils.run_vtctl(
['EmergencyReparentShard', 'test_keyspace/-80',
['PlannedReparentShard', 'test_keyspace/-80',
shard_0_replica.tablet_alias], auto_log=True)
utils.run_vtctl(
['EmergencyReparentShard', 'test_keyspace/80-',
['PlannedReparentShard', 'test_keyspace/80-',
shard_1_replica.tablet_alias], auto_log=True)
else:
@ -523,35 +539,6 @@ class TestReparentDuringWorkerCopy(TestBaseSplitCloneResiliency):
class TestMysqlDownDuringWorkerCopy(TestBaseSplitCloneResiliency):
def setUp(self):
"""Shuts down MySQL on the destination masters.
Also runs base setup.
"""
try:
logging.debug('Starting base setup for MysqlDownDuringWorkerCopy')
super(TestMysqlDownDuringWorkerCopy, self).setUp()
logging.debug('Starting MysqlDownDuringWorkerCopy-specific setup')
utils.wait_procs(
[shard_0_master.shutdown_mysql(),
shard_1_master.shutdown_mysql()])
logging.debug('Finished MysqlDownDuringWorkerCopy-specific setup')
except:
self.tearDown()
raise
def tearDown(self):
"""Restarts the MySQL processes that were killed during the setup."""
logging.debug('Starting MysqlDownDuringWorkerCopy-specific tearDown')
utils.wait_procs(
[shard_0_master.start_mysql(),
shard_1_master.start_mysql()])
logging.debug('Finished MysqlDownDuringWorkerCopy-specific tearDown')
super(TestMysqlDownDuringWorkerCopy, self).tearDown()
logging.debug('Finished base tearDown for MysqlDownDuringWorkerCopy')
def test_mysql_down_during_worker_copy(self):
"""This test simulates MySQL being down on the destination masters."""
self.verify_successful_worker_copy_with_reparent(mysql_down=True)