зеркало из https://github.com/github/vitess-gh.git
Added worker.py to cover worker integration tests
This commit is contained in:
Родитель
a110de54ab
Коммит
383e2c52e4
6
Makefile
6
Makefile
|
@ -100,7 +100,8 @@ medium_integration_test_files = \
|
|||
reparent.py \
|
||||
vtdb_test.py \
|
||||
vtgate_utils_test.py \
|
||||
rowcache_invalidator.py
|
||||
rowcache_invalidator.py \
|
||||
worker.py
|
||||
|
||||
large_integration_test_files = \
|
||||
vtgatev2_test.py \
|
||||
|
@ -122,7 +123,8 @@ worker_integration_test_files = \
|
|||
vertical_split.py \
|
||||
vertical_split_vtgate.py \
|
||||
initial_sharding.py \
|
||||
initial_sharding_bytes.py
|
||||
initial_sharding_bytes.py \
|
||||
worker.py
|
||||
|
||||
.ONESHELL:
|
||||
SHELL = /bin/bash
|
||||
|
|
|
@ -9,7 +9,7 @@ Let’s assume that you’ve already got a keyspace up and running, with a singl
|
|||
|
||||
The first thing that we need to do is add a column to the soon-to-be-sharded keyspace which will be used as the "sharding key". This column will tell Vitess which shard a particular row of data should go to. You can add the column by running an alter on the unsharded keyspace - probably by running something like:
|
||||
|
||||
`vtctl ApplySchemaKeyspace -simple -sql="alter table <table name> add keyspace_id" test_keyspace`
|
||||
`vtctl ApplySchema -sql="alter table <table name> add keyspace_id" test_keyspace`
|
||||
|
||||
for each table in the keyspace. Once the column is added everywhere, each row needs to be backfilled with the appropriate keyspace ID.
|
||||
|
||||
|
|
|
@ -151,9 +151,6 @@ func formatTableStatuses(tableStatuses []*tableStatus, startTime time.Time) ([]s
|
|||
|
||||
var errExtract = regexp.MustCompile(`\(errno (\d+)\)`)
|
||||
|
||||
// The amount of time we should wait before retrying ExecuteFetch calls
|
||||
var executeFetchRetryTime = (30 * time.Second)
|
||||
|
||||
// executeFetchWithRetries will attempt to run ExecuteFetch for a single command, with a reasonably small timeout.
|
||||
// If will keep retrying the ExecuteFetch (for a finite but longer duration) if it fails due to a timeout or a
|
||||
// retriable application error.
|
||||
|
@ -190,6 +187,9 @@ func executeFetchWithRetries(ctx context.Context, wr *wrangler.Wrangler, ti *top
|
|||
case errNo == "1290":
|
||||
wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL read-only error: %v", ti, err)
|
||||
statsRetryCounters.Add("ReadOnly", 1)
|
||||
case errNo == "2002" || errNo == "2006":
|
||||
wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL connection error: %v", ti, err)
|
||||
statsRetryCounters.Add("ConnectionError", 1)
|
||||
case errNo == "1062":
|
||||
if !isRetry {
|
||||
return ti, fmt.Errorf("ExecuteFetch failed on %v on the first attempt; not retrying as this is not a recoverable error: %v", ti, err)
|
||||
|
@ -200,7 +200,7 @@ func executeFetchWithRetries(ctx context.Context, wr *wrangler.Wrangler, ti *top
|
|||
// Unknown error
|
||||
return ti, err
|
||||
}
|
||||
t := time.NewTimer(executeFetchRetryTime)
|
||||
t := time.NewTimer(*executeFetchRetryTime)
|
||||
// don't leak memory if the timer isn't triggered
|
||||
defer t.Stop()
|
||||
|
||||
|
|
|
@ -384,7 +384,7 @@ func (scw *SplitCloneWorker) ResolveDestinationMasters() error {
|
|||
defer scw.resolveMu.Unlock()
|
||||
|
||||
// If the last resolution was fresh enough, return it.
|
||||
if time.Now().Sub(scw.resolveTime) < resolveTTL {
|
||||
if time.Now().Sub(scw.resolveTime) < *resolveTTL {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -329,7 +329,7 @@ func testSplitClone(t *testing.T, strategy string) {
|
|||
rightRdonly.FakeMysqlDaemon.DbAppConnectionFactory = DestinationsFactory(t, 30)
|
||||
|
||||
// Only wait 1 ms between retries, so that the test passes faster
|
||||
executeFetchRetryTime = (1 * time.Millisecond)
|
||||
*executeFetchRetryTime = (1 * time.Millisecond)
|
||||
|
||||
wrk.Run()
|
||||
status := wrk.StatusAsText()
|
||||
|
|
|
@ -346,7 +346,7 @@ func (vscw *VerticalSplitCloneWorker) ResolveDestinationMasters() error {
|
|||
defer vscw.resolveMu.Unlock()
|
||||
|
||||
// If the last resolution was fresh enough, return it.
|
||||
if time.Now().Sub(vscw.resolveTime) < resolveTTL {
|
||||
if time.Now().Sub(vscw.resolveTime) < *resolveTTL {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -314,7 +314,7 @@ func testVerticalSplitClone(t *testing.T, strategy string) {
|
|||
destRdonly.FakeMysqlDaemon.DbAppConnectionFactory = VerticalDestinationsFactory(t, 30)
|
||||
|
||||
// Only wait 1 ms between retries, so that the test passes faster
|
||||
executeFetchRetryTime = (1 * time.Millisecond)
|
||||
*executeFetchRetryTime = (1 * time.Millisecond)
|
||||
|
||||
wrk.Run()
|
||||
status := wrk.StatusAsText()
|
||||
|
|
|
@ -9,6 +9,7 @@ functions for long running actions. 'vtworker' binary will use these.
|
|||
package worker
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"html/template"
|
||||
"time"
|
||||
|
||||
|
@ -49,11 +50,10 @@ type Resolver interface {
|
|||
GetDestinationMaster(shardName string) (*topo.TabletInfo, error)
|
||||
}
|
||||
|
||||
// Resolvers should attempt to keep the previous topo resolution cached for at
|
||||
// least this long.
|
||||
const resolveTTL = 15 * time.Second
|
||||
|
||||
var (
|
||||
resolveTTL = flag.Duration("resolve_ttl", 15*time.Second, "Amount of time that a topo resolution can be cached for")
|
||||
executeFetchRetryTime = flag.Duration("executefetch_retry_time", 30*time.Second, "Amount of time we should wait before retrying ExecuteFetch calls")
|
||||
|
||||
statsState = stats.NewString("WorkerState")
|
||||
// the number of times that the worker attempst to reresolve the masters
|
||||
statsDestinationAttemptedResolves = stats.NewInt("WorkerDestinationAttemptedResolves")
|
||||
|
|
|
@ -75,6 +75,9 @@
|
|||
},
|
||||
{
|
||||
"File": "resharding.py"
|
||||
},
|
||||
{
|
||||
"File": "worker.py"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -240,7 +240,7 @@ class Tablet(object):
|
|||
rows = self.mquery('', 'show databases')
|
||||
for row in rows:
|
||||
dbname = row[0]
|
||||
if dbname in ['information_schema', '_vt', 'mysql']:
|
||||
if dbname in ['information_schema', 'mysql']:
|
||||
continue
|
||||
self.drop_db(dbname)
|
||||
|
||||
|
|
101
test/utils.py
101
test/utils.py
|
@ -20,6 +20,7 @@ import environment
|
|||
|
||||
from vtctl import vtctl_client
|
||||
from mysql_flavor import set_mysql_flavor
|
||||
from mysql_flavor import mysql_flavor
|
||||
from protocols_flavor import set_protocols_flavor, protocols_flavor
|
||||
from topo_flavor.server import set_topo_server_flavor
|
||||
|
||||
|
@ -79,7 +80,13 @@ def set_options(opts):
|
|||
|
||||
# main executes the test classes contained in the passed module, or
|
||||
# __main__ if empty.
|
||||
def main(mod=None):
|
||||
def main(mod=None, test_options=None):
|
||||
"""The replacement main method, which parses args and runs tests.
|
||||
|
||||
Args:
|
||||
test_options - a function which adds OptionParser options that are specific
|
||||
to a test file.
|
||||
"""
|
||||
if mod == None:
|
||||
mod = sys.modules['__main__']
|
||||
|
||||
|
@ -87,6 +94,8 @@ def main(mod=None):
|
|||
|
||||
parser = optparse.OptionParser(usage="usage: %prog [options] [test_names]")
|
||||
add_options(parser)
|
||||
if test_options:
|
||||
test_options(parser)
|
||||
(options, args) = parser.parse_args()
|
||||
|
||||
if options.verbose == 0:
|
||||
|
@ -256,12 +265,6 @@ def wait_procs(proc_list, raise_on_error=True):
|
|||
if raise_on_error:
|
||||
raise CalledProcessError(proc.returncode, ' '.join(proc.args))
|
||||
|
||||
def run_procs(cmds, raise_on_error=True):
|
||||
procs = []
|
||||
for cmd in cmds:
|
||||
procs.append(run_bg(cmd))
|
||||
wait_procs(procs, raise_on_error=raise_on_error)
|
||||
|
||||
def validate_topology(ping_tablets=False):
|
||||
if ping_tablets:
|
||||
run_vtctl(['Validate', '-ping-tablets'])
|
||||
|
@ -325,6 +328,40 @@ def wait_for_vars(name, port, var=None):
|
|||
break
|
||||
timeout = wait_step('waiting for /debug/vars of %s' % name, timeout)
|
||||
|
||||
def poll_for_vars(name, port, condition_msg, timeout=60.0, condition_fn=None):
|
||||
"""Polls for debug variables to exist, or match specific conditions, within a timeout.
|
||||
|
||||
This function polls in a tight loop, with no sleeps. This is useful for
|
||||
variables that are expected to be short-lived (e.g., a 'Done' state
|
||||
immediately before a process exits).
|
||||
|
||||
Args:
|
||||
name - the name of the process that we're trying to poll vars from.
|
||||
port - the port number that we should poll for variables.
|
||||
condition_msg - string describing the conditions that we're polling for,
|
||||
used for error messaging.
|
||||
timeout - number of seconds that we should attempt to poll for.
|
||||
condition_fn - a function that takes the debug vars dict as input, and
|
||||
returns a truthy value if it matches the success conditions.
|
||||
|
||||
Raises:
|
||||
TestError, if the conditions aren't met within the given timeout
|
||||
|
||||
Returns:
|
||||
dict of debug variables
|
||||
"""
|
||||
start_time = time.time()
|
||||
while True:
|
||||
if (time.time() - start_time) >= timeout:
|
||||
raise TestError('Timed out polling for vars from %s; condition "%s" not met' % (name, condition_msg))
|
||||
_vars = get_vars(port)
|
||||
if _vars is None:
|
||||
continue
|
||||
if condition_fn is None:
|
||||
return _vars
|
||||
elif condition_fn(_vars):
|
||||
return _vars
|
||||
|
||||
def apply_vschema(vschema):
|
||||
fname = os.path.join(environment.tmproot, "vschema.json")
|
||||
with open(fname, "w") as f:
|
||||
|
@ -345,6 +382,23 @@ def wait_for_tablet_type(tablet_alias, expected_type, timeout=10):
|
|||
timeout
|
||||
)
|
||||
|
||||
def wait_for_replication_pos(tablet_a, tablet_b, timeout=60.0):
|
||||
"""Waits for tablet B to catch up to the replication position of tablet A.
|
||||
|
||||
If the replication position does not catch up within timeout seconds, it will
|
||||
raise a TestError.
|
||||
"""
|
||||
replication_pos_a = mysql_flavor().master_position(tablet_a)
|
||||
while True:
|
||||
replication_pos_b = mysql_flavor().master_position(tablet_b)
|
||||
if mysql_flavor().position_at_least(replication_pos_b, replication_pos_a):
|
||||
break
|
||||
timeout = wait_step(
|
||||
"%s's replication position to catch up %s's; currently at: %s, waiting to catch up to: %s" % (
|
||||
tablet_b.tablet_alias, tablet_a.tablet_alias, replication_pos_b, replication_pos_a),
|
||||
timeout
|
||||
)
|
||||
|
||||
# vtgate helpers, assuming it always restarts on the same port
|
||||
def vtgate_start(vtport=None, cell='test_nj', retry_delay=1, retry_count=2,
|
||||
topo_impl=None, tablet_bson_encrypted=False, cache_ttl='1s',
|
||||
|
@ -463,10 +517,37 @@ def run_vtctl_json(clargs):
|
|||
|
||||
# vtworker helpers
|
||||
def run_vtworker(clargs, log_level='', auto_log=False, expect_fail=False, **kwargs):
|
||||
"""Runs a vtworker process, returning the stdout and stderr"""
|
||||
cmd, _ = _get_vtworker_cmd(clargs, log_level, auto_log)
|
||||
if expect_fail:
|
||||
return run_fail(cmd, **kwargs)
|
||||
return run(cmd, **kwargs)
|
||||
|
||||
def run_vtworker_bg(clargs, log_level='', auto_log=False, **kwargs):
|
||||
"""Starts a background vtworker process.
|
||||
|
||||
Returns:
|
||||
proc - process returned by subprocess.Popen
|
||||
port - int with the port number that the vtworker is running with
|
||||
"""
|
||||
cmd, port = _get_vtworker_cmd(clargs, log_level, auto_log)
|
||||
return run_bg(cmd, **kwargs), port
|
||||
|
||||
def _get_vtworker_cmd(clargs, log_level='', auto_log=False):
|
||||
"""Assembles the command that is needed to run a vtworker.
|
||||
|
||||
Returns:
|
||||
cmd - list of cmd arguments, can be passed to any `run`-like functions
|
||||
port - int with the port number that the vtworker is running with
|
||||
"""
|
||||
port = environment.reserve_ports(1)
|
||||
args = environment.binary_args('vtworker') + [
|
||||
'-log_dir', environment.vtlogroot,
|
||||
'-min_healthy_rdonly_endpoints', '1',
|
||||
'-port', str(environment.reserve_ports(1))]
|
||||
'-port', str(port),
|
||||
'-resolve_ttl', '2s',
|
||||
'-executefetch_retry_time', '1s',
|
||||
]
|
||||
args.extend(environment.topo_server().flags())
|
||||
args.extend(protocols_flavor().tablet_manager_protocol_flags())
|
||||
|
||||
|
@ -481,9 +562,7 @@ def run_vtworker(clargs, log_level='', auto_log=False, expect_fail=False, **kwar
|
|||
args.append('--stderrthreshold=%s' % log_level)
|
||||
|
||||
cmd = args + clargs
|
||||
if expect_fail:
|
||||
return run_fail(cmd, **kwargs)
|
||||
return run(cmd, **kwargs)
|
||||
return cmd, port
|
||||
|
||||
# vtclient2 helpers
|
||||
# driver is one of:
|
||||
|
|
|
@ -0,0 +1,438 @@
|
|||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright 2013, Google Inc. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can
|
||||
# be found in the LICENSE file.
|
||||
"""
|
||||
Tests the robustness and resiliency of vtworkers.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import unittest
|
||||
from collections import namedtuple
|
||||
|
||||
from vtdb import keyrange_constants
|
||||
|
||||
import environment
|
||||
import utils
|
||||
import tablet
|
||||
|
||||
|
||||
KEYSPACE_ID_TYPE = keyrange_constants.KIT_UINT64
|
||||
|
||||
|
||||
class ShardTablets(namedtuple('ShardTablets', 'master replicas rdonlys')):
|
||||
"""ShardTablets is a container for all the tablet.Tablets of a shard.
|
||||
|
||||
`master` should be a single Tablet, while `replicas` and `rdonlys` should be
|
||||
lists of Tablets of the appropriate types.
|
||||
"""
|
||||
|
||||
@property
|
||||
def all_tablets(self):
|
||||
"""Returns a list of all the tablets of the shard.
|
||||
|
||||
Does not guarantee any ordering on the returned tablets.
|
||||
"""
|
||||
return [self.master] + self.replicas + self.rdonlys
|
||||
|
||||
@property
|
||||
def replica(self):
|
||||
"""Returns the first replica Tablet instance for the shard, or None."""
|
||||
if self.replicas:
|
||||
return self.replicas[0]
|
||||
else:
|
||||
return None
|
||||
|
||||
@property
|
||||
def rdonly(self):
|
||||
"""Returns the first replica Tablet instance for the shard, or None."""
|
||||
if self.rdonlys:
|
||||
return self.rdonlys[0]
|
||||
else:
|
||||
return None
|
||||
|
||||
# initial shard, covers everything
|
||||
shard_master = tablet.Tablet()
|
||||
shard_replica = tablet.Tablet()
|
||||
shard_rdonly1 = tablet.Tablet()
|
||||
|
||||
# split shards
|
||||
# range "" - 80
|
||||
shard_0_master = tablet.Tablet()
|
||||
shard_0_replica = tablet.Tablet()
|
||||
shard_0_rdonly1 = tablet.Tablet()
|
||||
# range 80 - ""
|
||||
shard_1_master = tablet.Tablet()
|
||||
shard_1_replica = tablet.Tablet()
|
||||
shard_1_rdonly1 = tablet.Tablet()
|
||||
|
||||
shard_tablets = ShardTablets(shard_master, [shard_replica], [shard_rdonly1])
|
||||
shard_0_tablets = ShardTablets(shard_0_master, [shard_0_replica], [shard_0_rdonly1])
|
||||
shard_1_tablets = ShardTablets(shard_1_master, [shard_1_replica], [shard_1_rdonly1])
|
||||
|
||||
|
||||
def init_keyspace():
|
||||
"""Creates a `test_keyspace` keyspace with a sharding key."""
|
||||
utils.run_vtctl(['CreateKeyspace', '-sharding_column_name', 'keyspace_id',
|
||||
'-sharding_column_type', KEYSPACE_ID_TYPE,'test_keyspace'])
|
||||
|
||||
|
||||
def setUpModule():
|
||||
try:
|
||||
environment.topo_server().setup()
|
||||
|
||||
setup_procs = [
|
||||
shard_master.init_mysql(),
|
||||
shard_replica.init_mysql(),
|
||||
shard_rdonly1.init_mysql(),
|
||||
shard_0_master.init_mysql(),
|
||||
shard_0_replica.init_mysql(),
|
||||
shard_0_rdonly1.init_mysql(),
|
||||
shard_1_master.init_mysql(),
|
||||
shard_1_replica.init_mysql(),
|
||||
shard_1_rdonly1.init_mysql(),
|
||||
]
|
||||
utils.wait_procs(setup_procs)
|
||||
init_keyspace()
|
||||
except:
|
||||
tearDownModule()
|
||||
raise
|
||||
|
||||
|
||||
def tearDownModule():
|
||||
if utils.options.skip_teardown:
|
||||
return
|
||||
|
||||
teardown_procs = [
|
||||
shard_master.teardown_mysql(),
|
||||
shard_replica.teardown_mysql(),
|
||||
shard_rdonly1.teardown_mysql(),
|
||||
shard_0_master.teardown_mysql(),
|
||||
shard_0_replica.teardown_mysql(),
|
||||
shard_0_rdonly1.teardown_mysql(),
|
||||
shard_1_master.teardown_mysql(),
|
||||
shard_1_replica.teardown_mysql(),
|
||||
shard_1_rdonly1.teardown_mysql(),
|
||||
]
|
||||
utils.wait_procs(teardown_procs, raise_on_error=False)
|
||||
|
||||
environment.topo_server().teardown()
|
||||
utils.kill_sub_processes()
|
||||
utils.remove_tmp_files()
|
||||
|
||||
shard_master.remove_tree()
|
||||
shard_replica.remove_tree()
|
||||
shard_rdonly1.remove_tree()
|
||||
shard_0_master.remove_tree()
|
||||
shard_0_replica.remove_tree()
|
||||
shard_0_rdonly1.remove_tree()
|
||||
shard_1_master.remove_tree()
|
||||
shard_1_replica.remove_tree()
|
||||
shard_1_rdonly1.remove_tree()
|
||||
|
||||
|
||||
class TestBaseSplitCloneResiliency(unittest.TestCase):
|
||||
"""Tests that the SplitClone worker is resilient to particular failures."""
|
||||
|
||||
def run_shard_tablets(self, shard_name, shard_tablets, create_db=True, create_table=True, wait_state='SERVING'):
|
||||
"""Handles all the necessary work for initially running a shard's tablets.
|
||||
|
||||
This encompasses the following steps:
|
||||
1. InitTablet for the appropriate tablets and types
|
||||
2. (optional) Create db
|
||||
3. Starting vttablets
|
||||
4. Waiting for the appropriate vttablet state
|
||||
5. Force reparent to the master tablet
|
||||
6. RebuildKeyspaceGraph
|
||||
7. (optional) Running initial schema setup
|
||||
|
||||
Args:
|
||||
shard_name - the name of the shard to start tablets in
|
||||
shard_tablets - an instance of ShardTablets for the given shard
|
||||
wait_state - string, the vttablet state that we should wait for
|
||||
create_db - boolean, True iff we should create a db on the tablets
|
||||
create_table - boolean, True iff we should create a table on the tablets
|
||||
"""
|
||||
shard_tablets.master.init_tablet('master', 'test_keyspace', shard_name)
|
||||
for tablet in shard_tablets.replicas:
|
||||
tablet.init_tablet('replica', 'test_keyspace', shard_name)
|
||||
for tablet in shard_tablets.rdonlys:
|
||||
tablet.init_tablet('rdonly', 'test_keyspace', shard_name)
|
||||
|
||||
# Start tablets (and possibly create databases)
|
||||
for tablet in shard_tablets.all_tablets:
|
||||
if create_db:
|
||||
tablet.create_db('vt_test_keyspace')
|
||||
tablet.start_vttablet(wait_for_state=None)
|
||||
|
||||
# Wait for tablet state to change after starting all tablets. This allows
|
||||
# us to start all tablets at once, instead of sequentially waiting.
|
||||
for tablet in shard_tablets.all_tablets:
|
||||
tablet.wait_for_vttablet_state(wait_state)
|
||||
|
||||
# Reparent to choose an initial master
|
||||
utils.run_vtctl(['InitShardMaster', 'test_keyspace/%s' % shard_name,
|
||||
shard_tablets.master.tablet_alias], auto_log=True)
|
||||
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
|
||||
|
||||
create_table_sql = (
|
||||
'create table worker_test('
|
||||
'id bigint unsigned,'
|
||||
'msg varchar(64),'
|
||||
'keyspace_id bigint(20) unsigned not null,'
|
||||
'primary key (id),'
|
||||
'index by_msg (msg)'
|
||||
') Engine=InnoDB'
|
||||
)
|
||||
|
||||
if create_table:
|
||||
utils.run_vtctl(['ApplySchema',
|
||||
'-sql=' + create_table_sql,
|
||||
'test_keyspace'],
|
||||
auto_log=True)
|
||||
|
||||
def _insert_value(self, tablet, id, msg, keyspace_id):
|
||||
"""Inserts a value in the MySQL database along with the required routing comments.
|
||||
|
||||
Args:
|
||||
tablet - the Tablet instance to insert into
|
||||
id - the value of `id` column
|
||||
msg - the value of `msg` column
|
||||
keyspace_id - the value of `keyspace_id` column
|
||||
"""
|
||||
k = "%u" % keyspace_id
|
||||
tablet.mquery('vt_test_keyspace', [
|
||||
'begin',
|
||||
'insert into worker_test(id, msg, keyspace_id) values(%u, "%s", 0x%x) /* EMD keyspace_id:%s user_id:%u */' % (id, msg, keyspace_id, k, id),
|
||||
'commit'
|
||||
], write=True)
|
||||
|
||||
def insert_values(self, tablet, num_values, num_shards, offset=0, keyspace_id_range=2**64):
|
||||
"""Inserts simple values, one for each potential shard.
|
||||
|
||||
Each row is given a message that contains the shard number, so we can easily
|
||||
verify that the source and destination shards have the same data.
|
||||
|
||||
Args:
|
||||
tablet - the Tablet instance to insert into
|
||||
num_values - the number of values to insert
|
||||
num_shards - the number of shards that we expect to have
|
||||
offset - amount that we should offset the `id`s by. This is useful for
|
||||
inserting values multiple times.
|
||||
keyspace_id_range - the number of distinct values that the keyspace id can have
|
||||
"""
|
||||
shard_width = keyspace_id_range / num_shards
|
||||
shard_offsets = [i * shard_width for i in xrange(num_shards)]
|
||||
for i in xrange(num_values):
|
||||
for shard_num in xrange(num_shards):
|
||||
self._insert_value(tablet, shard_offsets[shard_num] + offset + i,
|
||||
'msg-shard-%u' % shard_num,
|
||||
shard_offsets[shard_num] + i)
|
||||
|
||||
def assert_shard_data_equal(self, shard_num, source_tablet, destination_tablet):
|
||||
"""Asserts that a shard's data is identical on source and destination tablets.
|
||||
|
||||
Args:
|
||||
shard_num - the shard number of the shard that we want to verify the data of
|
||||
source_tablet - Tablet instance of the source shard
|
||||
destination_tablet - Tablet instance of the destination shard
|
||||
"""
|
||||
select_query = 'select * from worker_test where msg="msg-shard-%s" order by id asc' % shard_num
|
||||
|
||||
source_rows = source_tablet.mquery('vt_test_keyspace', select_query)
|
||||
destination_rows = destination_tablet.mquery('vt_test_keyspace', select_query)
|
||||
self.assertEqual(source_rows, destination_rows)
|
||||
|
||||
def run_split_diff(self, keyspace_shard, source_tablets, destination_tablets):
|
||||
"""Runs a vtworker SplitDiff on the given keyspace/shard, and then sets all
|
||||
former rdonly slaves back to rdonly.
|
||||
|
||||
Args:
|
||||
keyspace_shard - keyspace/shard to run SplitDiff on (string)
|
||||
source_tablets - ShardTablets instance for the source shard
|
||||
destination_tablets - ShardTablets instance for the destination shard
|
||||
"""
|
||||
logging.debug("Running vtworker SplitDiff for %s" % keyspace_shard)
|
||||
stdout, stderr = utils.run_vtworker(['-cell', 'test_nj', 'SplitDiff',
|
||||
keyspace_shard], auto_log=True)
|
||||
|
||||
for shard_tablets in (source_tablets, destination_tablets):
|
||||
for tablet in shard_tablets.rdonlys:
|
||||
utils.run_vtctl(['ChangeSlaveType', tablet.tablet_alias, 'rdonly'],
|
||||
auto_log=True)
|
||||
|
||||
def setUp(self):
|
||||
"""Creates the necessary shards, starts the tablets, and inserts some data."""
|
||||
self.run_shard_tablets('0', shard_tablets)
|
||||
# create the split shards
|
||||
self.run_shard_tablets('-80', shard_0_tablets, create_db=False,
|
||||
create_table=False, wait_state='NOT_SERVING')
|
||||
self.run_shard_tablets('80-', shard_1_tablets, create_db=False,
|
||||
create_table=False, wait_state='NOT_SERVING')
|
||||
|
||||
# Copy the schema to the destinattion shards
|
||||
for keyspace_shard in ('test_keyspace/-80', 'test_keyspace/80-'):
|
||||
utils.run_vtctl(['CopySchemaShard',
|
||||
'--exclude_tables', 'unrelated',
|
||||
shard_rdonly1.tablet_alias,
|
||||
keyspace_shard],
|
||||
auto_log=True)
|
||||
|
||||
logging.debug("Start inserting initial data: %s rows", utils.options.num_insert_rows)
|
||||
self.insert_values(shard_master, utils.options.num_insert_rows, 2)
|
||||
logging.debug("Done inserting initial data, waiting for replication to catch up")
|
||||
utils.wait_for_replication_pos(shard_master, shard_rdonly1)
|
||||
logging.debug("Replication on source rdonly tablet is caught up")
|
||||
|
||||
def tearDown(self):
|
||||
"""Tries to do the minimum to reset topology and tablets to their initial states.
|
||||
|
||||
When benchmarked, this seemed to take around 30% of the time of (setupModule +
|
||||
tearDownModule).
|
||||
"""
|
||||
for shard_tablet in [shard_tablets, shard_0_tablets, shard_1_tablets]:
|
||||
for tablet in shard_tablet.all_tablets:
|
||||
tablet.clean_dbs()
|
||||
tablet.scrap(force=True, skip_rebuild=True)
|
||||
utils.run_vtctl(['DeleteTablet', tablet.tablet_alias], auto_log=True)
|
||||
tablet.kill_vttablet()
|
||||
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
|
||||
for shard in ['0', '-80', '80-']:
|
||||
utils.run_vtctl(['DeleteShard', 'test_keyspace/%s' % shard], auto_log=True)
|
||||
|
||||
def verify_successful_worker_copy_with_reparent(self, mysql_down=False):
|
||||
"""Verifies that vtworker can succesfully copy data for a SplitClone.
|
||||
|
||||
Order of operations:
|
||||
1. Run a background vtworker
|
||||
2. Wait until the worker sucessfully resolves the destination masters.
|
||||
3. Reparent the destination tablets
|
||||
4. Wait until the vtworker copy is finished
|
||||
5. Verify that the worker was forced to reresolve topology and retry writes
|
||||
due to the reparent.
|
||||
6. Verify that the data was copied successfully to both new shards
|
||||
|
||||
Args:
|
||||
mysql_down - boolean, True iff we expect the MySQL instances on the
|
||||
destination masters to be down.
|
||||
|
||||
Raises:
|
||||
AssertionError if things didn't go as expected.
|
||||
"""
|
||||
worker_proc, worker_port = utils.run_vtworker_bg(['--cell', 'test_nj',
|
||||
'SplitClone',
|
||||
'--source_reader_count', '1',
|
||||
'--destination_pack_count', '1',
|
||||
'--destination_writer_count', '1',
|
||||
'--strategy=-populate_blp_checkpoint',
|
||||
'test_keyspace/0'],
|
||||
auto_log=True)
|
||||
|
||||
if mysql_down:
|
||||
# If MySQL is down, we wait until resolving at least twice (to verify that
|
||||
# we do reresolve and retry due to MySQL being down).
|
||||
worker_vars = utils.poll_for_vars('vtworker', worker_port,
|
||||
'WorkerDestinationActualResolves >= 2',
|
||||
condition_fn=lambda v: v.get('WorkerDestinationActualResolves') >= 2)
|
||||
self.assertNotEqual(worker_vars['WorkerRetryCount'], {},
|
||||
"expected vtworker to retry, but it didn't")
|
||||
logging.debug("Worker has resolved at least twice, starting reparent now")
|
||||
|
||||
# Original masters have no running MySQL, so need to force the reparent
|
||||
utils.run_vtctl(['EmergencyReparentShard', 'test_keyspace/-80',
|
||||
shard_0_replica.tablet_alias], auto_log=True)
|
||||
utils.run_vtctl(['EmergencyReparentShard', 'test_keyspace/80-',
|
||||
shard_1_replica.tablet_alias], auto_log=True)
|
||||
|
||||
else:
|
||||
utils.poll_for_vars('vtworker', worker_port,
|
||||
'WorkerDestinationActualResolves >= 1',
|
||||
condition_fn=lambda v: v.get('WorkerDestinationActualResolves') >= 1)
|
||||
logging.debug("Worker has resolved at least once, starting reparent now")
|
||||
|
||||
utils.run_vtctl(['PlannedReparentShard', 'test_keyspace/-80',
|
||||
shard_0_replica.tablet_alias], auto_log=True)
|
||||
utils.run_vtctl(['PlannedReparentShard', 'test_keyspace/80-',
|
||||
shard_1_replica.tablet_alias], auto_log=True)
|
||||
|
||||
logging.debug("Polling for worker state")
|
||||
# There are a couple of race conditions around this, that we need to be careful of:
|
||||
# 1. It's possible for the reparent step to take so long that the worker will
|
||||
# actually finish before we get to the polling step. To workaround this,
|
||||
# the test takes a parameter to increase the number of rows that the worker
|
||||
# has to copy (with the idea being to slow the worker down).
|
||||
# 2. If the worker has a huge number of rows to copy, it's possible for the
|
||||
# polling to timeout before the worker has finished copying the data.
|
||||
#
|
||||
# You should choose a value for num_insert_rows, such that this test passes
|
||||
# for your environment (trial-and-error...)
|
||||
worker_vars = utils.poll_for_vars('vtworker', worker_port,
|
||||
'WorkerState == cleaning up',
|
||||
condition_fn=lambda v: v.get('WorkerState') == 'cleaning up')
|
||||
|
||||
# Verify that we were forced to reresolve and retry.
|
||||
self.assertGreater(worker_vars['WorkerDestinationActualResolves'], 1)
|
||||
self.assertGreater(worker_vars['WorkerDestinationAttemptedResolves'], 1)
|
||||
self.assertNotEqual(worker_vars['WorkerRetryCount'], {},
|
||||
"expected vtworker to retry, but it didn't")
|
||||
|
||||
utils.wait_procs([worker_proc])
|
||||
|
||||
utils.run_vtctl(['ChangeSlaveType', shard_rdonly1.tablet_alias, 'rdonly'],
|
||||
auto_log=True)
|
||||
|
||||
# Make sure that everything is caught up to the same replication point
|
||||
self.run_split_diff('test_keyspace/-80', shard_tablets, shard_0_tablets)
|
||||
self.run_split_diff('test_keyspace/80-', shard_tablets, shard_1_tablets)
|
||||
|
||||
self.assert_shard_data_equal(0, shard_master, shard_0_tablets.replica)
|
||||
self.assert_shard_data_equal(1, shard_master, shard_1_tablets.replica)
|
||||
|
||||
|
||||
class TestReparentDuringWorkerCopy(TestBaseSplitCloneResiliency):
|
||||
|
||||
def test_reparent_during_worker_copy(self):
|
||||
"""This test simulates a destination reparent during a worker SplitClone copy.
|
||||
|
||||
The SplitClone command should be able to gracefully handle the reparent and
|
||||
end up with the correct data on the destination.
|
||||
|
||||
Note: this test has a small possibility of flaking, due to the timing issues
|
||||
involved. It's possible for the worker to finish the copy step before the
|
||||
reparent succeeds, in which case there are assertions that will fail. This
|
||||
seems better than having the test silently pass.
|
||||
"""
|
||||
self.verify_successful_worker_copy_with_reparent()
|
||||
|
||||
|
||||
class TestMysqlDownDuringWorkerCopy(TestBaseSplitCloneResiliency):
|
||||
|
||||
def setUp(self):
|
||||
"""Shuts down MySQL on the destination masters (in addition to the base setup)"""
|
||||
logging.debug("Starting base setup for MysqlDownDuringWorkerCopy")
|
||||
super(TestMysqlDownDuringWorkerCopy, self).setUp()
|
||||
logging.debug("Starting MysqlDownDuringWorkerCopy-specific setup")
|
||||
utils.wait_procs([shard_0_master.shutdown_mysql(),
|
||||
shard_1_master.shutdown_mysql()])
|
||||
logging.debug("Finished MysqlDownDuringWorkerCopy-specific setup")
|
||||
|
||||
def tearDown(self):
|
||||
"""Restarts the MySQL processes that were killed during the setup."""
|
||||
logging.debug("Starting MysqlDownDuringWorkerCopy-specific tearDown")
|
||||
utils.wait_procs([shard_0_master.start_mysql(),
|
||||
shard_1_master.start_mysql()])
|
||||
logging.debug("Finished MysqlDownDuringWorkerCopy-specific tearDown")
|
||||
super(TestMysqlDownDuringWorkerCopy, self).tearDown()
|
||||
logging.debug("Finished base tearDown for MysqlDownDuringWorkerCopy")
|
||||
|
||||
def test_mysql_down_during_worker_copy(self):
|
||||
"""This test simulates MySQL being down on the destination masters."""
|
||||
self.verify_successful_worker_copy_with_reparent(mysql_down=True)
|
||||
|
||||
def add_test_options(parser):
|
||||
parser.add_option('--num_insert_rows', type="int", default=3000,
|
||||
help="The number of rows, per shard, that we should insert before resharding for this test.")
|
||||
|
||||
if __name__ == '__main__':
|
||||
utils.main(test_options=add_test_options)
|
Загрузка…
Ссылка в новой задаче