Use semi-sync in integration tests.

Some tests had to be modified to not assume that masters without
replicas will still accept writes. Also, some tests would create tablets
as SPARE without enabling healthcheck. As a result, the tablet has no
way of knowing whether it will eventually be a replica or rdonly, and
hence can't decide whether to enable semi-sync before starting
replication.

Healthcheck is basically required now, and definitely will be once we
fully switch to vtgate discovery mode. So I've removed cases where
tablets were being started SPARE without enabling healthcheck.
This commit is contained in:
Anthony Yeh 2016-02-23 19:32:35 -08:00
Родитель 6ed85e04ff
Коммит 44d10d40c4
14 изменённых файлов: 110 добавлений и 107 удалений

Просмотреть файл

@ -74,6 +74,7 @@ class TestBackup(unittest.TestCase):
environment.topo_server().wipe()
for t in [tablet_master, tablet_replica1, tablet_replica2]:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
_create_vt_insert_test = '''create table vt_insert_test (

Просмотреть файл

@ -15,24 +15,24 @@ import environment
import tablet
import utils
# shards
# shards need at least 1 replica for semi-sync ACK, and 1 rdonly for SplitQuery.
shard_0_master = tablet.Tablet()
shard_0_replica = tablet.Tablet()
shard_0_rdonly = tablet.Tablet()
shard_1_master = tablet.Tablet()
shard_1_replica = tablet.Tablet()
shard_1_rdonly = tablet.Tablet()
all_tablets = [shard_0_master, shard_0_replica, shard_0_rdonly,
shard_1_master, shard_1_replica, shard_1_rdonly]
def setUpModule():
try:
environment.topo_server().setup()
setup_procs = [
shard_0_master.init_mysql(),
shard_0_rdonly.init_mysql(),
shard_1_master.init_mysql(),
shard_1_rdonly.init_mysql(),
]
setup_procs = [t.init_mysql() for t in all_tablets]
utils.Vtctld().start()
utils.VtGate().start()
utils.wait_procs(setup_procs)
@ -46,22 +46,15 @@ def tearDownModule():
if utils.options.skip_teardown:
return
teardown_procs = [
shard_0_master.teardown_mysql(),
shard_0_rdonly.teardown_mysql(),
shard_1_master.teardown_mysql(),
shard_1_rdonly.teardown_mysql(),
]
teardown_procs = [t.teardown_mysql() for t in all_tablets]
utils.wait_procs(teardown_procs, raise_on_error=False)
environment.topo_server().teardown()
utils.kill_sub_processes()
utils.remove_tmp_files()
shard_0_master.remove_tree()
shard_0_rdonly.remove_tree()
shard_1_master.remove_tree()
shard_1_rdonly.remove_tree()
for t in all_tablets:
t.remove_tree()
class TestCustomSharding(unittest.TestCase):
@ -118,11 +111,12 @@ class TestCustomSharding(unittest.TestCase):
# start the first shard only for now
shard_0_master.init_tablet('master', 'test_keyspace', '0')
shard_0_replica.init_tablet('replica', 'test_keyspace', '0')
shard_0_rdonly.init_tablet('rdonly', 'test_keyspace', '0')
for t in [shard_0_master, shard_0_rdonly]:
for t in [shard_0_master, shard_0_replica, shard_0_rdonly]:
t.create_db('vt_test_keyspace')
t.start_vttablet(wait_for_state=None)
for t in [shard_0_master, shard_0_rdonly]:
for t in [shard_0_master, shard_0_replica, shard_0_rdonly]:
t.wait_for_vttablet_state('SERVING')
utils.run_vtctl(['InitShardMaster', 'test_keyspace/0',
@ -143,7 +137,7 @@ primary key (id)
auto_log=True)
# reload schema everywhere so the QueryService knows about the tables
for t in [shard_0_master, shard_0_rdonly]:
for t in [shard_0_master, shard_0_replica, shard_0_rdonly]:
utils.run_vtctl(['ReloadSchema', t.tablet_alias], auto_log=True)
# insert data on shard 0
@ -154,10 +148,11 @@ primary key (id)
# create shard 1
shard_1_master.init_tablet('master', 'test_keyspace', '1')
shard_1_replica.init_tablet('replica', 'test_keyspace', '1')
shard_1_rdonly.init_tablet('rdonly', 'test_keyspace', '1')
for t in [shard_1_master, shard_1_rdonly]:
for t in [shard_1_master, shard_1_replica, shard_1_rdonly]:
t.start_vttablet(wait_for_state=None)
for t in [shard_1_master, shard_1_rdonly]:
for t in [shard_1_master, shard_1_replica, shard_1_rdonly]:
t.wait_for_vttablet_state('NOT_SERVING')
s = utils.run_vtctl_json(['GetShard', 'test_keyspace/1'])
self.assertEqual(len(s['served_types']), 3)
@ -166,7 +161,7 @@ primary key (id)
shard_1_master.tablet_alias], auto_log=True)
utils.run_vtctl(['CopySchemaShard', shard_0_rdonly.tablet_alias,
'test_keyspace/1'], auto_log=True)
for t in [shard_1_master, shard_1_rdonly]:
for t in [shard_1_master, shard_1_replica, shard_1_rdonly]:
utils.run_vtctl(['RefreshState', t.tablet_alias], auto_log=True)
t.wait_for_vttablet_state('SERVING')
@ -189,7 +184,7 @@ primary key (id)
auto_log=True)
# reload schema everywhere so the QueryService knows about the tables
for t in [shard_0_master, shard_0_rdonly, shard_1_master, shard_1_rdonly]:
for t in all_tablets:
utils.run_vtctl(['ReloadSchema', t.tablet_alias], auto_log=True)
# insert and read data on all shards
@ -240,7 +235,8 @@ primary key (id)
def _check_shards_count_in_srv_keyspace(self, shard_count):
ks = utils.run_vtctl_json(['GetSrvKeyspace', 'test_nj', 'test_keyspace'])
check_types = set([topodata_pb2.MASTER, topodata_pb2.RDONLY])
check_types = set([topodata_pb2.MASTER, topodata_pb2.REPLICA,
topodata_pb2.RDONLY])
for p in ks['partitions']:
if p['served_type'] in check_types:
self.assertEqual(len(p['shard_references']), shard_count)

Просмотреть файл

@ -21,7 +21,12 @@ class TestEnv(object):
self.tablet_map = {}
def launch(
self, keyspace, shards=None, replica_count=0, rdonly_count=0, ddls=None):
self, keyspace, shards=None, replica_count=1, rdonly_count=0, ddls=None):
"""Launch test environment."""
if replica_count < 1:
raise Exception('replica_count=%d < 1; tests now use semi-sync'
' and must have at least one replica' % replica_count)
self.tablets = []
utils.run_vtctl(['CreateKeyspace', keyspace])
if not shards or shards[0] == '0':
@ -52,8 +57,6 @@ class TestEnv(object):
if t.tablet_type == 'master':
utils.run_vtctl(['InitShardMaster', keyspace+'/'+t.shard,
t.tablet_alias], auto_log=True)
# Force read-write even if there are no replicas.
utils.run_vtctl(['SetReadWrite', t.tablet_alias], auto_log=True)
for ddl in ddls:
fname = os.path.join(environment.tmproot, 'ddl.sql')
@ -70,6 +73,8 @@ class TestEnv(object):
t.remove_tree()
def _start_tablet(self, keyspace, shard, tablet_type, index):
"""Start a tablet."""
t = tablet.Tablet()
self.tablets.append(t)
if tablet_type == 'master':

Просмотреть файл

@ -1,4 +1,5 @@
#!/usr/bin/env python
"""Define abstractions for various MySQL flavors."""
import environment
import logging
@ -28,6 +29,15 @@ class MysqlFlavor(object):
def change_master_commands(self, host, port, pos):
raise NotImplementedError()
def set_semi_sync_enabled_commands(self, master=None, slave=None):
"""Returns commands to turn semi-sync on/off."""
cmds = []
if master is not None:
cmds.append("SET GLOBAL rpl_semi_sync_master_enabled = %d" % master)
if slave is not None:
cmds.append("SET GLOBAL rpl_semi_sync_slave_enabled = %d" % master)
return cmds
def extra_my_cnf(self):
"""Returns the path to an extra my_cnf file, or None."""
return None
@ -157,6 +167,11 @@ def mysql_flavor():
def set_mysql_flavor(flavor):
"""Set the object that will be returned by mysql_flavor().
If flavor is not specified, set it based on MYSQL_FLAVOR environment variable.
"""
global __mysql_flavor
if not flavor:

Просмотреть файл

@ -64,6 +64,7 @@ class TestMysqlctl(unittest.TestCase):
tablet.Tablet.check_vttablet_count()
for t in [master_tablet, replica_tablet]:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
def test_mysqlctl_restart(self):

Просмотреть файл

@ -67,6 +67,7 @@ class TestReparent(unittest.TestCase):
environment.topo_server().wipe()
for t in [tablet_62344, tablet_62044, tablet_41983, tablet_31981]:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
super(TestReparent, self).tearDown()
@ -320,13 +321,7 @@ class TestReparent(unittest.TestCase):
self._check_master_cell('test_nj', shard_id, 'test_nj')
self._check_master_cell('test_ny', shard_id, 'test_nj')
# Convert two replica to spare. That should leave only one node
# serving traffic, but still needs to appear in the replication
# graph.
utils.run_vtctl(['ChangeSlaveType', tablet_41983.tablet_alias, 'spare'])
utils.run_vtctl(['ChangeSlaveType', tablet_31981.tablet_alias, 'spare'])
utils.validate_topology()
self._check_db_addr(shard_id, 'replica', tablet_62044.port)
# Run this to make sure it succeeds.
utils.run_vtctl(['ShardReplicationPositions', 'test_keyspace/' + shard_id],
@ -569,13 +564,12 @@ class TestReparent(unittest.TestCase):
wait_for_start=False)
tablet_31981.init_tablet('replica', 'test_keyspace', shard_id, start=True,
wait_for_start=False)
tablet_41983.init_tablet('spare', 'test_keyspace', shard_id, start=True,
tablet_41983.init_tablet('replica', 'test_keyspace', shard_id, start=True,
wait_for_start=False)
# wait for all tablets to start
for t in [tablet_62344, tablet_62044, tablet_31981]:
for t in [tablet_62344, tablet_62044, tablet_31981, tablet_41983]:
t.wait_for_vttablet_state('SERVING')
tablet_41983.wait_for_vttablet_state('NOT_SERVING')
# Recompute the shard layout node - until you do that, it might not be
# valid.

Просмотреть файл

@ -472,7 +472,7 @@ primary key (name)
shard_0_ny_rdonly.init_tablet('rdonly', 'test_keyspace', '-80')
shard_1_master.init_tablet('master', 'test_keyspace', '80-')
shard_1_slave1.init_tablet('replica', 'test_keyspace', '80-')
shard_1_slave2.init_tablet('spare', 'test_keyspace', '80-')
shard_1_slave2.init_tablet('replica', 'test_keyspace', '80-')
shard_1_ny_rdonly.init_tablet('rdonly', 'test_keyspace', '80-')
shard_1_rdonly1.init_tablet('rdonly', 'test_keyspace', '80-')
@ -497,7 +497,7 @@ primary key (name)
shard_0_ny_rdonly.wait_for_vttablet_state('SERVING')
shard_1_master.wait_for_vttablet_state('SERVING')
shard_1_slave1.wait_for_vttablet_state('SERVING')
shard_1_slave2.wait_for_vttablet_state('NOT_SERVING') # spare
shard_1_slave2.wait_for_vttablet_state('SERVING')
shard_1_ny_rdonly.wait_for_vttablet_state('SERVING')
shard_1_rdonly1.wait_for_vttablet_state('SERVING')
@ -521,10 +521,10 @@ primary key (name)
# create the split shards
shard_2_master.init_tablet('master', 'test_keyspace', '80-c0')
shard_2_replica1.init_tablet('spare', 'test_keyspace', '80-c0')
shard_2_replica2.init_tablet('spare', 'test_keyspace', '80-c0')
shard_2_replica1.init_tablet('replica', 'test_keyspace', '80-c0')
shard_2_replica2.init_tablet('replica', 'test_keyspace', '80-c0')
shard_3_master.init_tablet('master', 'test_keyspace', 'c0-')
shard_3_replica.init_tablet('spare', 'test_keyspace', 'c0-')
shard_3_replica.init_tablet('replica', 'test_keyspace', 'c0-')
shard_3_rdonly1.init_tablet('rdonly', 'test_keyspace', 'c0-')
# start vttablet on the split shards (no db created,

Просмотреть файл

@ -19,6 +19,11 @@ warnings.simplefilter('ignore')
master_tablet = tablet.Tablet()
replica_tablet = tablet.Tablet()
# Second replica to provide semi-sync ACKs while testing
# scenarios when the first replica is down.
replica2_tablet = tablet.Tablet()
all_tablets = [master_tablet, replica_tablet, replica2_tablet]
create_vt_insert_test = '''create table vt_insert_test (
id bigint auto_increment,
@ -32,9 +37,7 @@ def setUpModule():
environment.topo_server().setup()
# start mysql instance external to the test
setup_procs = [master_tablet.init_mysql(),
replica_tablet.init_mysql()]
utils.wait_procs(setup_procs)
utils.wait_procs([t.init_mysql() for t in all_tablets])
# start a vtctld so the vtctl insert commands are just RPCs, not forks
utils.Vtctld().start()
@ -44,15 +47,16 @@ def setUpModule():
utils.run_vtctl(['CreateKeyspace', 'test_keyspace'])
master_tablet.init_tablet('master', 'test_keyspace', '0')
replica_tablet.init_tablet('replica', 'test_keyspace', '0')
replica2_tablet.init_tablet('replica', 'test_keyspace', '0')
utils.validate_topology()
master_tablet.populate('vt_test_keyspace', create_vt_insert_test)
replica_tablet.populate('vt_test_keyspace', create_vt_insert_test)
for t in all_tablets:
t.populate('vt_test_keyspace', create_vt_insert_test)
master_tablet.start_vttablet(memcache=True, wait_for_state=None)
replica_tablet.start_vttablet(memcache=True, wait_for_state=None)
master_tablet.wait_for_vttablet_state('SERVING')
replica_tablet.wait_for_vttablet_state('SERVING')
for t in all_tablets:
t.start_vttablet(memcache=True, wait_for_state=None)
for t in all_tablets:
t.wait_for_vttablet_state('SERVING')
utils.run_vtctl(['InitShardMaster', 'test_keyspace/0',
master_tablet.tablet_alias], auto_log=True)
@ -71,16 +75,15 @@ def tearDownModule():
if utils.options.skip_teardown:
return
logging.debug('Tearing down the servers and setup')
tablet.kill_tablets([master_tablet, replica_tablet])
teardown_procs = [master_tablet.teardown_mysql(),
replica_tablet.teardown_mysql()]
utils.wait_procs(teardown_procs, raise_on_error=False)
tablet.kill_tablets(all_tablets)
utils.wait_procs([t.teardown_mysql() for t in all_tablets],
raise_on_error=False)
environment.topo_server().teardown()
utils.kill_sub_processes()
utils.remove_tmp_files()
master_tablet.remove_tree()
replica_tablet.remove_tree()
for t in all_tablets:
t.remove_tree()
class MultiDict(dict):

Просмотреть файл

@ -118,6 +118,8 @@ def _teardown_shard_2():
['DeleteShard', '-recursive', 'test_keyspace/2'], auto_log=True)
for t in shard_2_tablets:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()

Просмотреть файл

@ -266,6 +266,12 @@ class Tablet(object):
def reset_replication(self):
self.mquery('', mysql_flavor().reset_replication_commands())
def set_semi_sync_enabled(self, master=None, slave=None):
logging.debug('mysql(%s): setting semi-sync mode: master=%s, slave=%s',
self.tablet_uid, master, slave)
self.mquery('',
mysql_flavor().set_semi_sync_enabled_commands(master, slave))
def populate(self, dbname, create_sql, insert_sqls=None):
self.create_db(dbname)
if isinstance(create_sql, basestring):
@ -342,6 +348,8 @@ class Tablet(object):
tablet_index=None,
start=False, dbname=None, parent=True, wait_for_start=True,
include_mysql_port=True, **kwargs):
"""Initialize a tablet's record in topology."""
self.tablet_type = tablet_type
self.keyspace = keyspace
self.shard = shard
@ -399,7 +407,7 @@ class Tablet(object):
extra_args=None, extra_env=None, include_mysql_port=True,
init_tablet_type=None, init_keyspace=None,
init_shard=None, init_db_name_override=None,
supports_backups=False, grace_period='1s'):
supports_backups=False, grace_period='1s', enable_semi_sync=True):
"""Starts a vttablet process, and returns it.
The process is also saved in self.proc, so it's easy to kill as well.
@ -422,6 +430,8 @@ class Tablet(object):
args.extend(['-binlog_player_healthcheck_retry_delay', '1s'])
args.extend(['-binlog_player_retry_delay', '1s'])
args.extend(['-pid_file', os.path.join(self.tablet_dir, 'vttablet.pid')])
if enable_semi_sync:
args.append('-enable_semi_sync')
if self.use_mysqlctld:
args.extend(
['-mysqlctl_socket', os.path.join(self.tablet_dir, 'mysqlctl.sock')])

Просмотреть файл

@ -66,6 +66,7 @@ class TestTabletManager(unittest.TestCase):
environment.topo_server().wipe()
for t in [tablet_62344, tablet_62044]:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
def _check_srv_shard(self):

Просмотреть файл

@ -15,13 +15,11 @@ import utils
# range '' - 80
shard_0_master = tablet.Tablet()
shard_0_replica = tablet.Tablet()
shard_0_spare = tablet.Tablet()
# range 80 - ''
shard_1_master = tablet.Tablet()
shard_1_replica = tablet.Tablet()
# all tablets
tablets = [shard_0_master, shard_0_replica, shard_1_master, shard_1_replica,
shard_0_spare]
tablets = [shard_0_master, shard_0_replica, shard_1_master, shard_1_replica]
def setUpModule():
@ -67,8 +65,7 @@ class TestVtctld(unittest.TestCase):
'redirected_keyspace'])
shard_0_master.init_tablet('master', 'test_keyspace', '-80')
shard_0_replica.init_tablet('spare', 'test_keyspace', '-80')
shard_0_spare.init_tablet('spare', 'test_keyspace', '-80')
shard_0_replica.init_tablet('replica', 'test_keyspace', '-80')
shard_1_master.init_tablet('master', 'test_keyspace', '80-')
shard_1_replica.init_tablet('replica', 'test_keyspace', '80-')
@ -86,18 +83,11 @@ class TestVtctld(unittest.TestCase):
target_tablet_type='replica',
wait_for_state=None)
shard_0_spare.start_vttablet(wait_for_state=None,
extra_args=utils.vtctld.process_args())
# wait for the right states
for t in [shard_0_master, shard_1_master, shard_1_replica]:
t.wait_for_vttablet_state('SERVING')
for t in [shard_0_replica, shard_0_spare]:
t.wait_for_vttablet_state('NOT_SERVING')
shard_0_replica.wait_for_vttablet_state('NOT_SERVING')
for t in [shard_0_master, shard_0_replica, shard_0_spare,
shard_1_master, shard_1_replica]:
t.reset_replication()
utils.run_vtctl(['InitShardMaster', 'test_keyspace/-80',
shard_0_master.tablet_alias], auto_log=True)
utils.run_vtctl(['InitShardMaster', 'test_keyspace/80-',

Просмотреть файл

@ -829,8 +829,6 @@ class TestFailures(BaseTestCase):
self.master_tablet = shard_1_master
self.master_tablet.kill_vttablet()
self.tablet_start(self.master_tablet, 'replica')
utils.run_vtctl(['InitShardMaster', KEYSPACE_NAME+'/-80',
shard_0_master.tablet_alias], auto_log=True)
self.master_tablet.wait_for_vttablet_state('SERVING')
self.replica_tablet = shard_1_replica1
self.replica_tablet.kill_vttablet()

Просмотреть файл

@ -384,6 +384,7 @@ class TestBaseSplitClone(unittest.TestCase):
for shard_tablet in [all_shard_tablets, shard_0_tablets, shard_1_tablets]:
for t in shard_tablet.all_tablets:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
t.kill_vttablet()
# we allow failures here as some tablets will be gone sometimes
@ -420,12 +421,18 @@ class TestBaseSplitCloneResiliency(TestBaseSplitClone):
6. Verify that the data was copied successfully to both new shards
Args:
mysql_down: boolean, True iff we expect the MySQL instances on the
destination masters to be down.
mysql_down: boolean. If True, we take down the MySQL instances on the
destination masters at first, then bring them back and reparent away.
Raises:
AssertionError if things didn't go as expected.
"""
if mysql_down:
logging.debug('Shutting down mysqld on destination masters.')
utils.wait_procs(
[shard_0_master.shutdown_mysql(),
shard_1_master.shutdown_mysql()])
worker_proc, worker_port, worker_rpc_port = utils.run_vtworker_bg(
['--cell', 'test_nj'],
auto_log=True)
@ -450,12 +457,21 @@ class TestBaseSplitCloneResiliency(TestBaseSplitClone):
"expected vtworker to retry, but it didn't")
logging.debug('Worker has resolved at least twice, starting reparent now')
# Original masters have no running MySQL, so need to force the reparent.
# Bring back masters. Since we test with semi-sync now, we need at least
# one replica for the new master. This test is already quite expensive,
# so we bring back the old master as a replica rather than having a third
# replica up the whole time.
logging.debug('Restarting mysqld on destination masters')
utils.wait_procs(
[shard_0_master.start_mysql(),
shard_1_master.start_mysql()])
# Reparent away from the old masters.
utils.run_vtctl(
['EmergencyReparentShard', 'test_keyspace/-80',
['PlannedReparentShard', 'test_keyspace/-80',
shard_0_replica.tablet_alias], auto_log=True)
utils.run_vtctl(
['EmergencyReparentShard', 'test_keyspace/80-',
['PlannedReparentShard', 'test_keyspace/80-',
shard_1_replica.tablet_alias], auto_log=True)
else:
@ -523,35 +539,6 @@ class TestReparentDuringWorkerCopy(TestBaseSplitCloneResiliency):
class TestMysqlDownDuringWorkerCopy(TestBaseSplitCloneResiliency):
def setUp(self):
"""Shuts down MySQL on the destination masters.
Also runs base setup.
"""
try:
logging.debug('Starting base setup for MysqlDownDuringWorkerCopy')
super(TestMysqlDownDuringWorkerCopy, self).setUp()
logging.debug('Starting MysqlDownDuringWorkerCopy-specific setup')
utils.wait_procs(
[shard_0_master.shutdown_mysql(),
shard_1_master.shutdown_mysql()])
logging.debug('Finished MysqlDownDuringWorkerCopy-specific setup')
except:
self.tearDown()
raise
def tearDown(self):
"""Restarts the MySQL processes that were killed during the setup."""
logging.debug('Starting MysqlDownDuringWorkerCopy-specific tearDown')
utils.wait_procs(
[shard_0_master.start_mysql(),
shard_1_master.start_mysql()])
logging.debug('Finished MysqlDownDuringWorkerCopy-specific tearDown')
super(TestMysqlDownDuringWorkerCopy, self).tearDown()
logging.debug('Finished base tearDown for MysqlDownDuringWorkerCopy')
def test_mysql_down_during_worker_copy(self):
"""This test simulates MySQL being down on the destination masters."""
self.verify_successful_worker_copy_with_reparent(mysql_down=True)