test: Add end-to-end test for automated horizontal resharding.

This commit is contained in:
Michael Berlin 2015-07-09 22:26:30 -07:00
Родитель 3bd105ce3c
Коммит b6c924463e
4 изменённых файлов: 61 добавлений и 439 удалений

Просмотреть файл

@ -103,7 +103,8 @@ medium_integration_test_files = \
vtdb_test.py \
vtgate_utils_test.py \
rowcache_invalidator.py \
worker.py \
large_integration_test_files = \
vtgatev2_test.py \

Просмотреть файл

@ -1,457 +1,57 @@
#!/usr/bin/env python
# Copyright 2013, Google Inc. All rights reserved.
# Copyright 2015, Google Inc. All rights reserved.
# Use of this source code is governed by a BSD-style license that can
# be found in the LICENSE file.
Tests the robustness and resiliency of vtworkers.
End-to-end test for horizontal resharding automation.
import logging
import unittest
from collections import namedtuple
from vtdb import keyrange_constants
import environment
import utils
import tablet
KEYSPACE_ID_TYPE = keyrange_constants.KIT_UINT64
class ShardTablets(namedtuple('ShardTablets', 'master replicas rdonlys')):
"""ShardTablets is a container for all the tablet.Tablets of a shard.
`master` should be a single Tablet, while `replicas` and `rdonlys` should be
lists of Tablets of the appropriate types.
def all_tablets(self):
"""Returns a list of all the tablets of the shard.
Does not guarantee any ordering on the returned tablets.
return [self.master] + self.replicas + self.rdonlys
def replica(self):
"""Returns the first replica Tablet instance for the shard, or None."""
if self.replicas:
return self.replicas[0]
return None
def rdonly(self):
"""Returns the first replica Tablet instance for the shard, or None."""
if self.rdonlys:
return self.rdonlys[0]
return None
# initial shard, covers everything
shard_master = tablet.Tablet()
shard_replica = tablet.Tablet()
shard_rdonly1 = tablet.Tablet()
# split shards
# range "" - 80
shard_0_master = tablet.Tablet()
shard_0_replica = tablet.Tablet()
shard_0_rdonly1 = tablet.Tablet()
# range 80 - ""
shard_1_master = tablet.Tablet()
shard_1_replica = tablet.Tablet()
shard_1_rdonly1 = tablet.Tablet()
shard_tablets = ShardTablets(shard_master, [shard_replica], [shard_rdonly1])
shard_0_tablets = ShardTablets(shard_0_master, [shard_0_replica], [shard_0_rdonly1])
shard_1_tablets = ShardTablets(shard_1_master, [shard_1_replica], [shard_1_rdonly1])
def init_keyspace():
"""Creates a `test_keyspace` keyspace with a sharding key."""
utils.run_vtctl(['CreateKeyspace', '-sharding_column_name', 'keyspace_id',
'-sharding_column_type', KEYSPACE_ID_TYPE,'test_keyspace'])
import worker
def setUpModule():
setup_procs = [
def tearDownModule():
if utils.options.skip_teardown:
teardown_procs = [
utils.wait_procs(teardown_procs, raise_on_error=False)
class TestBaseSplitCloneResiliency(unittest.TestCase):
"""Tests that the SplitClone worker is resilient to particular failures."""
def run_shard_tablets(self, shard_name, shard_tablets, create_db=True, create_table=True, wait_state='SERVING'):
"""Handles all the necessary work for initially running a shard's tablets.
This encompasses the following steps:
1. InitTablet for the appropriate tablets and types
2. (optional) Create db
3. Starting vttablets
4. Waiting for the appropriate vttablet state
5. Force reparent to the master tablet
6. RebuildKeyspaceGraph
7. (optional) Running initial schema setup
shard_name - the name of the shard to start tablets in
shard_tablets - an instance of ShardTablets for the given shard
wait_state - string, the vttablet state that we should wait for
create_db - boolean, True iff we should create a db on the tablets
create_table - boolean, True iff we should create a table on the tablets
class TestAutomationHorizontalResharding(worker.TestBaseSplitCloneResiliency):
"""This test reuses worker.py because worker.py also covers the happy path
of the horizontal resharding code. Instead of running the different resharding
steps "manually" as part of the test, they will be run by the automation
cluster operation.
shard_tablets.master.init_tablet('master', 'test_keyspace', shard_name)
for tablet in shard_tablets.replicas:
tablet.init_tablet('replica', 'test_keyspace', shard_name)
for tablet in shard_tablets.rdonlys:
tablet.init_tablet('rdonly', 'test_keyspace', shard_name)
# Start tablets (and possibly create databases)
for tablet in shard_tablets.all_tablets:
if create_db:
# Wait for tablet state to change after starting all tablets. This allows
# us to start all tablets at once, instead of sequentially waiting.
for tablet in shard_tablets.all_tablets:
# Reparent to choose an initial master
utils.run_vtctl(['InitShardMaster', 'test_keyspace/%s' % shard_name,
shard_tablets.master.tablet_alias], auto_log=True)
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
create_table_sql = (
'create table worker_test('
'id bigint unsigned,'
'msg varchar(64),'
'keyspace_id bigint(20) unsigned not null,'
'primary key (id),'
'index by_msg (msg)'
') Engine=InnoDB'
if create_table:
'-sql=' + create_table_sql,
def test_regular_operation(self):
# Use a dedicated worker to run all vtworker commands.
worker_proc, _, worker_rpc_port = utils.run_vtworker_bg(
['--cell', 'test_nj'],
def _insert_values(self, tablet, id_offset, msg, keyspace_id, num_values):
"""Inserts values in the MySQL database along with the required routing comments.
automation_server_proc, automation_server_port = utils.run_automation_server()
tablet - the Tablet instance to insert into
id - the value of `id` column
msg - the value of `msg` column
keyspace_id - the value of `keyspace_id` column
k = "%u" % keyspace_id
values_str = ''
for i in xrange(num_values):
if i != 0:
values_str += ','
values_str += '(%u, "%s", 0x%x)' % (id_offset + i, msg, keyspace_id)
tablet.mquery('vt_test_keyspace', [
'insert into worker_test(id, msg, keyspace_id) values%s /* EMD keyspace_id:%s*/' % (values_str, k),
], write=True)
keyspace = 'test_keyspace'
source_shard_list = "0"
dest_shard_list = "-80,80-"
utils.run(environment.binary_argstr('automation_client') +
' --server localhost:' + str(automation_server_port) +
' --task HorizontalReshardingTask' +
' --param keyspace=' + keyspace +
' --param source_shard_list=' + source_shard_list +
' --param dest_shard_list=' + dest_shard_list +
' --param source_shard_rdonly_list=' + worker.shard_rdonly1.tablet_alias +
' --param dest_shard_rdonly_list=' + worker.shard_0_rdonly1.tablet_alias + ',' + worker.shard_1_rdonly1.tablet_alias +
' --param vtctld_endpoint=' + utils.vtctld.rpc_endpoint() +
' --param vtworker_endpoint=localhost:' + str(worker_rpc_port))
def insert_values(self, tablet, num_values, num_shards, offset=0, keyspace_id_range=2**64):
"""Inserts simple values, one for each potential shard.
self.assert_shard_data_equal(0, worker.shard_master, worker.shard_0_tablets.replica)
self.assert_shard_data_equal(1, worker.shard_master, worker.shard_1_tablets.replica)
Each row is given a message that contains the shard number, so we can easily
verify that the source and destination shards have the same data.
tablet - the Tablet instance to insert into
num_values - the number of values to insert
num_shards - the number of shards that we expect to have
offset - amount that we should offset the `id`s by. This is useful for
inserting values multiple times.
keyspace_id_range - the number of distinct values that the keyspace id can have
shard_width = keyspace_id_range / num_shards
shard_offsets = [i * shard_width for i in xrange(num_shards)]
for shard_num in xrange(num_shards):
shard_offsets[shard_num] + offset,
'msg-shard-%u' % shard_num,
def assert_shard_data_equal(self, shard_num, source_tablet, destination_tablet):
"""Asserts that a shard's data is identical on source and destination tablets.
shard_num - the shard number of the shard that we want to verify the data of
source_tablet - Tablet instance of the source shard
destination_tablet - Tablet instance of the destination shard
select_query = 'select * from worker_test where msg="msg-shard-%s" order by id asc' % shard_num
# Make sure all the right rows made it from the source to the destination
source_rows = source_tablet.mquery('vt_test_keyspace', select_query)
destination_rows = destination_tablet.mquery('vt_test_keyspace', select_query)
self.assertEqual(source_rows, destination_rows)
# Make sure that there are no extra rows on the destination
count_query = 'select count(*) from worker_test'
destination_count = destination_tablet.mquery('vt_test_keyspace', count_query)[0][0]
self.assertEqual(destination_count, len(destination_rows))
def run_split_diff(self, keyspace_shard, source_tablets, destination_tablets):
"""Runs a vtworker SplitDiff on the given keyspace/shard, and then sets all
former rdonly slaves back to rdonly.
keyspace_shard - keyspace/shard to run SplitDiff on (string)
source_tablets - ShardTablets instance for the source shard
destination_tablets - ShardTablets instance for the destination shard
logging.debug("Running vtworker SplitDiff for %s" % keyspace_shard)
stdout, stderr = utils.run_vtworker(['-cell', 'test_nj', 'SplitDiff',
keyspace_shard], auto_log=True)
for shard_tablets in (source_tablets, destination_tablets):
for tablet in shard_tablets.rdonlys:
utils.run_vtctl(['ChangeSlaveType', tablet.tablet_alias, 'rdonly'],
def setUp(self):
"""Creates the necessary shards, starts the tablets, and inserts some data."""
self.run_shard_tablets('0', shard_tablets)
# create the split shards
self.run_shard_tablets('-80', shard_0_tablets, create_db=False,
create_table=False, wait_state='NOT_SERVING')
self.run_shard_tablets('80-', shard_1_tablets, create_db=False,
create_table=False, wait_state='NOT_SERVING')
logging.debug("Start inserting initial data: %s rows", utils.options.num_insert_rows)
self.insert_values(shard_master, utils.options.num_insert_rows, 2)
logging.debug("Done inserting initial data, waiting for replication to catch up")
utils.wait_for_replication_pos(shard_master, shard_rdonly1)
logging.debug("Replication on source rdonly tablet is caught up")
def tearDown(self):
"""Tries to do the minimum to reset topology and tablets to their initial states.
When benchmarked, this seemed to take around 30% of the time of (setupModule +
for shard_tablet in [shard_tablets, shard_0_tablets, shard_1_tablets]:
for tablet in shard_tablet.all_tablets:
tablet.scrap(force=True, skip_rebuild=True)
utils.run_vtctl(['DeleteTablet', tablet.tablet_alias], auto_log=True)
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
for shard in ['0', '-80', '80-']:
utils.run_vtctl(['DeleteShard', 'test_keyspace/%s' % shard], auto_log=True)
def verify_successful_worker_copy_with_reparent(self, mysql_down=False):
"""Verifies that vtworker can successfully copy data for a SplitClone.
Order of operations:
1. Copy the schema to the destination shards.
2. Run a background vtworker.
3. Wait until the worker successfully resolves the destination masters.
4. Reparent the destination tablets.
5. Wait until the vtworker copy is finished.
6. Verify that the worker was forced to reresolve topology and retry writes
due to the reparent.
7. Verify that the data was copied successfully to both new shards.
mysql_down - boolean, True iff we expect the MySQL instances on the
destination masters to be down.
AssertionError if things didn't go as expected.
# Copy the schema to the destination shards.
for keyspace_shard in ('test_keyspace/-80', 'test_keyspace/80-'):
'--exclude_tables', 'unrelated',
worker_proc, worker_port, _ = utils.run_vtworker_bg(['--cell', 'test_nj',
'--source_reader_count', '1',
'--destination_pack_count', '1',
'--destination_writer_count', '1',
if mysql_down:
# If MySQL is down, we wait until resolving at least twice (to verify that
# we do reresolve and retry due to MySQL being down).
worker_vars = utils.poll_for_vars('vtworker', worker_port,
'WorkerDestinationActualResolves >= 2',
condition_fn=lambda v: v.get('WorkerDestinationActualResolves') >= 2)
self.assertNotEqual(worker_vars['WorkerRetryCount'], {},
"expected vtworker to retry, but it didn't")
logging.debug("Worker has resolved at least twice, starting reparent now")
# Original masters have no running MySQL, so need to force the reparent
utils.run_vtctl(['EmergencyReparentShard', 'test_keyspace/-80',
shard_0_replica.tablet_alias], auto_log=True)
utils.run_vtctl(['EmergencyReparentShard', 'test_keyspace/80-',
shard_1_replica.tablet_alias], auto_log=True)
utils.poll_for_vars('vtworker', worker_port,
'WorkerDestinationActualResolves >= 1',
condition_fn=lambda v: v.get('WorkerDestinationActualResolves') >= 1)
logging.debug("Worker has resolved at least once, starting reparent now")
utils.run_vtctl(['PlannedReparentShard', 'test_keyspace/-80',
shard_0_replica.tablet_alias], auto_log=True)
utils.run_vtctl(['PlannedReparentShard', 'test_keyspace/80-',
shard_1_replica.tablet_alias], auto_log=True)
logging.debug("Polling for worker state")
# There are a couple of race conditions around this, that we need to be careful of:
# 1. It's possible for the reparent step to take so long that the worker will
# actually finish before we get to the polling step. To workaround this,
# the test takes a parameter to increase the number of rows that the worker
# has to copy (with the idea being to slow the worker down).
# 2. If the worker has a huge number of rows to copy, it's possible for the
# polling to timeout before the worker has finished copying the data.
# You should choose a value for num_insert_rows, such that this test passes
# for your environment (trial-and-error...)
worker_vars = utils.poll_for_vars('vtworker', worker_port,
'WorkerState == cleaning up',
condition_fn=lambda v: v.get('WorkerState') == 'cleaning up',
# We know that vars should already be ready, since we read them earlier.
# We're willing to let the test run for longer to make it less flaky.
# This should still fail fast if something goes wrong with vtworker,
# because of the require_vars flag above.
# Verify that we were forced to reresolve and retry.
self.assertGreater(worker_vars['WorkerDestinationActualResolves'], 1)
self.assertGreater(worker_vars['WorkerDestinationAttemptedResolves'], 1)
self.assertNotEqual(worker_vars['WorkerRetryCount'], {},
"expected vtworker to retry, but it didn't")
utils.run_vtctl(['ChangeSlaveType', shard_rdonly1.tablet_alias, 'rdonly'],
# Make sure that everything is caught up to the same replication point.
self.run_split_diff('test_keyspace/-80', shard_tablets, shard_0_tablets)
self.run_split_diff('test_keyspace/80-', shard_tablets, shard_1_tablets)
self.assert_shard_data_equal(0, shard_master, shard_0_tablets.replica)
self.assert_shard_data_equal(1, shard_master, shard_1_tablets.replica)
class TestReparentDuringWorkerCopy(TestBaseSplitCloneResiliency):
def test_reparent_during_worker_copy(self):
"""This test simulates a destination reparent during a worker SplitClone copy.
The SplitClone command should be able to gracefully handle the reparent and
end up with the correct data on the destination.
Note: this test has a small possibility of flaking, due to the timing issues
involved. It's possible for the worker to finish the copy step before the
reparent succeeds, in which case there are assertions that will fail. This
seems better than having the test silently pass.
class TestMysqlDownDuringWorkerCopy(TestBaseSplitCloneResiliency):
def setUp(self):
"""Shuts down MySQL on the destination masters (in addition to the base setup)"""
logging.debug("Starting base setup for MysqlDownDuringWorkerCopy")
super(TestMysqlDownDuringWorkerCopy, self).setUp()
logging.debug("Starting MysqlDownDuringWorkerCopy-specific setup")
logging.debug("Finished MysqlDownDuringWorkerCopy-specific setup")
def tearDown(self):
"""Restarts the MySQL processes that were killed during the setup."""
logging.debug("Starting MysqlDownDuringWorkerCopy-specific tearDown")
logging.debug("Finished MysqlDownDuringWorkerCopy-specific tearDown")
super(TestMysqlDownDuringWorkerCopy, self).tearDown()
logging.debug("Finished base tearDown for MysqlDownDuringWorkerCopy")
def test_mysql_down_during_worker_copy(self):
"""This test simulates MySQL being down on the destination masters."""
def add_test_options(parser):
parser.add_option('--num_insert_rows', type="int", default=3000,
help="The number of rows, per shard, that we should insert before resharding for this test.")
utils.kill_sub_process(automation_server_proc, soft=True)
utils.kill_sub_process(worker_proc, soft=True)
if __name__ == '__main__':

Просмотреть файл

@ -83,6 +83,9 @@
"worker": {
"File": "worker.py"
"automation_horizontal_resharding": {
"File": "automation_horizontal_resharding.py"
"tablet": {
"File": "tablet_test.py"

Просмотреть файл

@ -700,6 +700,24 @@ def run_vtworker_client(args, rpc_port):
return out, err
def run_automation_server(auto_log=False):
"""Starts a background automation_server process.
rpc_port - int with the port number of the RPC interface
rpc_port = environment.reserve_ports(1)
args = environment.binary_args('automation_server') + [
'-log_dir', environment.vtlogroot,
'-port', str(rpc_port),
'-vtctl_client_protocol', protocols_flavor().vtctl_client_protocol(),
'-vtworker_client_protocol', protocols_flavor().vtworker_client_protocol(),
if auto_log:
args.append('--stderrthreshold=%s' % get_log_level())
return run_bg(args), rpc_port
# mysql helpers
def mysql_query(uid, dbname, query):
conn = MySQLdb.Connect(user='vt_dba',