зеркало из https://github.com/github/vitess-gh.git
Make test/worker.py do batch insert at setup, so that worker copy and initial insert take similiar amounts of time
This commit is contained in:
Родитель
08e0d27dc7
Коммит
b575f2f89f
|
@ -328,7 +328,7 @@ def wait_for_vars(name, port, var=None):
|
|||
break
|
||||
timeout = wait_step('waiting for /debug/vars of %s' % name, timeout)
|
||||
|
||||
def poll_for_vars(name, port, condition_msg, timeout=60.0, condition_fn=None):
|
||||
def poll_for_vars(name, port, condition_msg, timeout=60.0, condition_fn=None, require_vars=False):
|
||||
"""Polls for debug variables to exist, or match specific conditions, within a timeout.
|
||||
|
||||
This function polls in a tight loop, with no sleeps. This is useful for
|
||||
|
@ -343,9 +343,14 @@ def poll_for_vars(name, port, condition_msg, timeout=60.0, condition_fn=None):
|
|||
timeout - number of seconds that we should attempt to poll for.
|
||||
condition_fn - a function that takes the debug vars dict as input, and
|
||||
returns a truthy value if it matches the success conditions.
|
||||
require_vars - True iff we expect the vars to always exist. If True, and the
|
||||
vars don't exist, we'll raise a TestError. This can be used to differentiate
|
||||
between a timeout waiting for a particular condition vs if the process that
|
||||
you're polling has already exited.
|
||||
|
||||
Raises:
|
||||
TestError, if the conditions aren't met within the given timeout
|
||||
TestError, if vars are required and don't exist
|
||||
|
||||
Returns:
|
||||
dict of debug variables
|
||||
|
@ -356,6 +361,8 @@ def poll_for_vars(name, port, condition_msg, timeout=60.0, condition_fn=None):
|
|||
raise TestError('Timed out polling for vars from %s; condition "%s" not met' % (name, condition_msg))
|
||||
_vars = get_vars(port)
|
||||
if _vars is None:
|
||||
if require_vars:
|
||||
raise TestError('Expected vars to exist on %s, but they do not; process probably exited earlier than expected.' % (name,))
|
||||
continue
|
||||
if condition_fn is None:
|
||||
return _vars
|
||||
|
@ -396,7 +403,7 @@ def wait_for_replication_pos(tablet_a, tablet_b, timeout=60.0):
|
|||
timeout = wait_step(
|
||||
"%s's replication position to catch up %s's; currently at: %s, waiting to catch up to: %s" % (
|
||||
tablet_b.tablet_alias, tablet_a.tablet_alias, replication_pos_b, replication_pos_a),
|
||||
timeout
|
||||
timeout, sleep_time=0.1
|
||||
)
|
||||
|
||||
# vtgate helpers, assuming it always restarts on the same port
|
||||
|
|
|
@ -192,8 +192,8 @@ class TestBaseSplitCloneResiliency(unittest.TestCase):
|
|||
'test_keyspace'],
|
||||
auto_log=True)
|
||||
|
||||
def _insert_value(self, tablet, id, msg, keyspace_id):
|
||||
"""Inserts a value in the MySQL database along with the required routing comments.
|
||||
def _insert_values(self, tablet, id_offset, msg, keyspace_id, num_values):
|
||||
"""Inserts values in the MySQL database along with the required routing comments.
|
||||
|
||||
Args:
|
||||
tablet - the Tablet instance to insert into
|
||||
|
@ -202,9 +202,14 @@ class TestBaseSplitCloneResiliency(unittest.TestCase):
|
|||
keyspace_id - the value of `keyspace_id` column
|
||||
"""
|
||||
k = "%u" % keyspace_id
|
||||
values_str = ''
|
||||
for i in xrange(num_values):
|
||||
if i != 0:
|
||||
values_str += ','
|
||||
values_str += '(%u, "%s", 0x%x)' % (id_offset + i, msg, keyspace_id)
|
||||
tablet.mquery('vt_test_keyspace', [
|
||||
'begin',
|
||||
'insert into worker_test(id, msg, keyspace_id) values(%u, "%s", 0x%x) /* EMD keyspace_id:%s user_id:%u */' % (id, msg, keyspace_id, k, id),
|
||||
'insert into worker_test(id, msg, keyspace_id) values%s /* EMD keyspace_id:%s*/' % (values_str, k),
|
||||
'commit'
|
||||
], write=True)
|
||||
|
||||
|
@ -224,11 +229,12 @@ class TestBaseSplitCloneResiliency(unittest.TestCase):
|
|||
"""
|
||||
shard_width = keyspace_id_range / num_shards
|
||||
shard_offsets = [i * shard_width for i in xrange(num_shards)]
|
||||
for i in xrange(num_values):
|
||||
for shard_num in xrange(num_shards):
|
||||
self._insert_value(tablet, shard_offsets[shard_num] + offset + i,
|
||||
self._insert_values(tablet,
|
||||
shard_offsets[shard_num] + offset,
|
||||
'msg-shard-%u' % shard_num,
|
||||
shard_offsets[shard_num] + i)
|
||||
shard_offsets[shard_num],
|
||||
num_values)
|
||||
|
||||
def assert_shard_data_equal(self, shard_num, source_tablet, destination_tablet):
|
||||
"""Asserts that a shard's data is identical on source and destination tablets.
|
||||
|
@ -369,7 +375,9 @@ class TestBaseSplitCloneResiliency(unittest.TestCase):
|
|||
# for your environment (trial-and-error...)
|
||||
worker_vars = utils.poll_for_vars('vtworker', worker_port,
|
||||
'WorkerState == cleaning up',
|
||||
condition_fn=lambda v: v.get('WorkerState') == 'cleaning up')
|
||||
condition_fn=lambda v: v.get('WorkerState') == 'cleaning up',
|
||||
# We know that vars should already be ready, since we read them earlier
|
||||
require_vars=True)
|
||||
|
||||
# Verify that we were forced to reresolve and retry.
|
||||
self.assertGreater(worker_vars['WorkerDestinationActualResolves'], 1)
|
||||
|
|
Загрузка…
Ссылка в новой задаче