Change resharding.py test to use a sharding key column that isn't keyspace_id

This commit is contained in:
Ammar Aijazi 2016-03-22 12:31:51 -07:00
Родитель 7008624386
Коммит 67e5183b9d
2 изменённых файлов: 61 добавлений и 48 удалений

Просмотреть файл

@ -119,23 +119,23 @@ def tearDownModule():
# every 1/5s will update its value with the current timestamp # every 1/5s will update its value with the current timestamp
class InsertThread(threading.Thread): class InsertThread(threading.Thread):
def __init__(self, tablet_obj, object_name, user_id, keyspace_id): def __init__(self, tablet_obj, object_name, user_id, custom_sharding_key):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.tablet = tablet_obj self.tablet = tablet_obj
self.object_name = object_name self.object_name = object_name
self.user_id = user_id self.user_id = user_id
self.keyspace_id = keyspace_id self.custom_sharding_key = custom_sharding_key
self.str_keyspace_id = utils.uint64_to_hex(keyspace_id) self.str_custom_sharding_key = utils.uint64_to_hex(custom_sharding_key)
self.done = False self.done = False
self.tablet.mquery( self.tablet.mquery(
'vt_test_keyspace', 'vt_test_keyspace',
['begin', ['begin',
'insert into timestamps(name, time_milli, keyspace_id) ' 'insert into timestamps(name, time_milli, custom_sharding_key) '
"values('%s', %d, 0x%x) " "values('%s', %d, 0x%x) "
'/* vtgate:: keyspace_id:%s */ /* user_id:%d */' % '/* vtgate:: keyspace_id:%s */ /* user_id:%d */' %
(self.object_name, long(time.time() * 1000), self.keyspace_id, (self.object_name, long(time.time() * 1000), self.custom_sharding_key,
self.str_keyspace_id, self.user_id), self.str_custom_sharding_key, self.user_id),
'commit'], 'commit'],
write=True, user='vt_app') write=True, user='vt_app')
self.start() self.start()
@ -149,7 +149,7 @@ class InsertThread(threading.Thread):
'update timestamps set time_milli=%d ' 'update timestamps set time_milli=%d '
'where name="%s" /* vtgate:: keyspace_id:%s */ /* user_id:%d */' % 'where name="%s" /* vtgate:: keyspace_id:%s */ /* user_id:%d */' %
(long(time.time() * 1000), self.object_name, (long(time.time() * 1000), self.object_name,
self.str_keyspace_id, self.user_id), self.str_custom_sharding_key, self.user_id),
'commit'], 'commit'],
write=True, user='vt_app') write=True, user='vt_app')
time.sleep(0.2) time.sleep(0.2)
@ -203,17 +203,18 @@ class TestResharding(unittest.TestCase):
create_table_template = '''create table %s( create_table_template = '''create table %s(
id bigint auto_increment, id bigint auto_increment,
msg varchar(64), msg varchar(64),
keyspace_id ''' + t + ''' not null, custom_sharding_key ''' + t + ''' not null,
primary key (id), primary key (id),
index by_msg (msg) index by_msg (msg)
) Engine=InnoDB''' ) Engine=InnoDB'''
create_view_template = ( create_view_template = (
'create view %s(id, msg, keyspace_id) as select id, msg, keyspace_id ' 'create view %s'
'(id, msg, custom_sharding_key) as select id, msg, custom_sharding_key '
'from %s') 'from %s')
create_timestamp_table = '''create table timestamps( create_timestamp_table = '''create table timestamps(
name varchar(64), name varchar(64),
time_milli bigint(20) unsigned not null, time_milli bigint(20) unsigned not null,
keyspace_id ''' + t + ''' not null, custom_sharding_key ''' + t + ''' not null,
primary key (name) primary key (name)
) Engine=InnoDB''' ) Engine=InnoDB'''
create_unrelated_table = '''create table unrelated( create_unrelated_table = '''create table unrelated(
@ -244,41 +245,42 @@ primary key (name)
# _insert_value inserts a value in the MySQL database along with the comments # _insert_value inserts a value in the MySQL database along with the comments
# required for routing. # required for routing.
def _insert_value(self, tablet_obj, table, mid, msg, keyspace_id): def _insert_value(self, tablet_obj, table, mid, msg, custom_sharding_key):
k = utils.uint64_to_hex(keyspace_id) k = utils.uint64_to_hex(custom_sharding_key)
tablet_obj.mquery( tablet_obj.mquery(
'vt_test_keyspace', 'vt_test_keyspace',
['begin', ['begin',
'insert into %s(id, msg, keyspace_id) ' 'insert into %s(id, msg, custom_sharding_key) '
'values(%d, "%s", 0x%x) /* vtgate:: keyspace_id:%s */ ' 'values(%d, "%s", 0x%x) /* vtgate:: keyspace_id:%s */ '
'/* user_id:%d */' % '/* user_id:%d */' %
(table, mid, msg, keyspace_id, k, mid), (table, mid, msg, custom_sharding_key, k, mid),
'commit'], 'commit'],
write=True) write=True)
def _get_value(self, tablet_obj, table, mid): def _get_value(self, tablet_obj, table, mid):
return tablet_obj.mquery( return tablet_obj.mquery(
'vt_test_keyspace', 'vt_test_keyspace',
'select id, msg, keyspace_id from %s where id=%d' % (table, mid)) 'select id, msg, custom_sharding_key from %s where id=%d' %
(table, mid))
def _check_value(self, tablet_obj, table, mid, msg, keyspace_id, def _check_value(self, tablet_obj, table, mid, msg, custom_sharding_key,
should_be_here=True): should_be_here=True):
result = self._get_value(tablet_obj, table, mid) result = self._get_value(tablet_obj, table, mid)
if keyspace_id_type == keyrange_constants.KIT_BYTES: if keyspace_id_type == keyrange_constants.KIT_BYTES:
fmt = '%s' fmt = '%s'
keyspace_id = pack_keyspace_id(keyspace_id) custom_sharding_key = pack_keyspace_id(custom_sharding_key)
else: else:
fmt = '%x' fmt = '%x'
if should_be_here: if should_be_here:
self.assertEqual(result, ((mid, msg, keyspace_id),), self.assertEqual(result, ((mid, msg, custom_sharding_key),),
('Bad row in tablet %s for id=%d, keyspace_id=' + ('Bad row in tablet %s for id=%d, custom_sharding_key=' +
fmt + ', row=%s') % (tablet_obj.tablet_alias, mid, fmt + ', row=%s') % (tablet_obj.tablet_alias, mid,
keyspace_id, str(result))) custom_sharding_key, str(result)))
else: else:
self.assertEqual( self.assertEqual(
len(result), 0, len(result), 0,
('Extra row in tablet %s for id=%d, keyspace_id=' + ('Extra row in tablet %s for id=%d, custom_sharding_key=' +
fmt + ': %s') % (tablet_obj.tablet_alias, mid, keyspace_id, fmt + ': %s') % (tablet_obj.tablet_alias, mid, custom_sharding_key,
str(result))) str(result)))
# _is_value_present_and_correct tries to read a value. # _is_value_present_and_correct tries to read a value.
@ -286,18 +288,19 @@ primary key (name)
# if not correct, it will self.fail. # if not correct, it will self.fail.
# if not there, it will return False. # if not there, it will return False.
def _is_value_present_and_correct( def _is_value_present_and_correct(
self, tablet_obj, table, mid, msg, keyspace_id): self, tablet_obj, table, mid, msg, custom_sharding_key):
result = self._get_value(tablet_obj, table, mid) result = self._get_value(tablet_obj, table, mid)
if not result: if not result:
return False return False
if keyspace_id_type == keyrange_constants.KIT_BYTES: if keyspace_id_type == keyrange_constants.KIT_BYTES:
fmt = '%s' fmt = '%s'
keyspace_id = pack_keyspace_id(keyspace_id) custom_sharding_key = pack_keyspace_id(custom_sharding_key)
else: else:
fmt = '%x' fmt = '%x'
self.assertEqual(result, ((mid, msg, keyspace_id),), self.assertEqual(result, ((mid, msg, custom_sharding_key),),
('Bad row in tablet %s for id=%d, keyspace_id=' + fmt) % ( ('Bad row in tablet %s for id=%d, '
tablet_obj.tablet_alias, mid, keyspace_id)) 'custom_sharding_key=' + fmt) % (
tablet_obj.tablet_alias, mid, custom_sharding_key))
return True return True
def _insert_startup_values(self): def _insert_startup_values(self):
@ -433,23 +436,23 @@ primary key (name)
def _test_keyrange_constraints(self): def _test_keyrange_constraints(self):
with self.assertRaisesRegexp( with self.assertRaisesRegexp(
Exception, '.*enforce keyspace_id range.*'): Exception, '.*enforce custom_sharding_key range.*'):
shard_0_master.execute( shard_0_master.execute(
"insert into resharding1(id, msg, keyspace_id) " "insert into resharding1(id, msg, custom_sharding_key) "
" values(1, 'msg', :keyspace_id)", " values(1, 'msg', :custom_sharding_key)",
bindvars={'keyspace_id': 0x9000000000000000}, bindvars={'custom_sharding_key': 0x9000000000000000},
) )
with self.assertRaisesRegexp( with self.assertRaisesRegexp(
Exception, '.*enforce keyspace_id range.*'): Exception, '.*enforce custom_sharding_key range.*'):
shard_0_master.execute( shard_0_master.execute(
"update resharding1 set msg = 'msg' where id = 1", "update resharding1 set msg = 'msg' where id = 1",
bindvars={'keyspace_id': 0x9000000000000000}, bindvars={'custom_sharding_key': 0x9000000000000000},
) )
with self.assertRaisesRegexp( with self.assertRaisesRegexp(
Exception, '.*enforce keyspace_id range.*'): Exception, '.*enforce custom_sharding_key range.*'):
shard_0_master.execute( shard_0_master.execute(
'delete from resharding1 where id = 1', 'delete from resharding1 where id = 1',
bindvars={'keyspace_id': 0x9000000000000000}, bindvars={'custom_sharding_key': 0x9000000000000000},
) )
def test_resharding(self): def test_resharding(self):
@ -462,10 +465,10 @@ primary key (name)
'--split_shard_count', '2', '--split_shard_count', '2',
'test_keyspace']) 'test_keyspace'])
utils.run_vtctl(['SetKeyspaceShardingInfo', 'test_keyspace', utils.run_vtctl(['SetKeyspaceShardingInfo', 'test_keyspace',
'keyspace_id', 'uint64'], expect_fail=True) 'custom_sharding_key', 'uint64'], expect_fail=True)
utils.run_vtctl(['SetKeyspaceShardingInfo', utils.run_vtctl(['SetKeyspaceShardingInfo',
'-force', '-split_shard_count', '4', '-force', '-split_shard_count', '4',
'test_keyspace', 'keyspace_id', keyspace_id_type]) 'test_keyspace', 'custom_sharding_key', keyspace_id_type])
shard_0_master.init_tablet('master', 'test_keyspace', '-80') shard_0_master.init_tablet('master', 'test_keyspace', '-80')
shard_0_replica.init_tablet('replica', 'test_keyspace', '-80') shard_0_replica.init_tablet('replica', 'test_keyspace', '-80')
@ -550,7 +553,8 @@ primary key (name)
'Partitions(master): -80 80-\n' 'Partitions(master): -80 80-\n'
'Partitions(rdonly): -80 80-\n' 'Partitions(rdonly): -80 80-\n'
'Partitions(replica): -80 80-\n', 'Partitions(replica): -80 80-\n',
keyspace_id_type=keyspace_id_type) keyspace_id_type=keyspace_id_type,
sharding_column_name='custom_sharding_key')
# the worker will do everything. We test with source_reader_count=10 # the worker will do everything. We test with source_reader_count=10
# (down from default=20) as connection pool is not big enough for 20. # (down from default=20) as connection pool is not big enough for 20.
@ -684,12 +688,14 @@ primary key (name)
'Partitions(master): -80 80-\n' 'Partitions(master): -80 80-\n'
'Partitions(rdonly): -80 80-c0 c0-\n' 'Partitions(rdonly): -80 80-c0 c0-\n'
'Partitions(replica): -80 80-\n', 'Partitions(replica): -80 80-\n',
keyspace_id_type=keyspace_id_type) keyspace_id_type=keyspace_id_type,
sharding_column_name='custom_sharding_key')
utils.check_srv_keyspace('test_ny', 'test_keyspace', utils.check_srv_keyspace('test_ny', 'test_keyspace',
'Partitions(master): -80 80-\n' 'Partitions(master): -80 80-\n'
'Partitions(rdonly): -80 80-\n' 'Partitions(rdonly): -80 80-\n'
'Partitions(replica): -80 80-\n', 'Partitions(replica): -80 80-\n',
keyspace_id_type=keyspace_id_type) keyspace_id_type=keyspace_id_type,
sharding_column_name='custom_sharding_key')
utils.check_tablet_query_service(self, shard_0_ny_rdonly, True, False) utils.check_tablet_query_service(self, shard_0_ny_rdonly, True, False)
utils.check_tablet_query_service(self, shard_1_ny_rdonly, True, False) utils.check_tablet_query_service(self, shard_1_ny_rdonly, True, False)
utils.check_tablet_query_service(self, shard_1_rdonly1, False, True) utils.check_tablet_query_service(self, shard_1_rdonly1, False, True)
@ -701,12 +707,14 @@ primary key (name)
'Partitions(master): -80 80-\n' 'Partitions(master): -80 80-\n'
'Partitions(rdonly): -80 80-c0 c0-\n' 'Partitions(rdonly): -80 80-c0 c0-\n'
'Partitions(replica): -80 80-\n', 'Partitions(replica): -80 80-\n',
keyspace_id_type=keyspace_id_type) keyspace_id_type=keyspace_id_type,
sharding_column_name='custom_sharding_key')
utils.check_srv_keyspace('test_ny', 'test_keyspace', utils.check_srv_keyspace('test_ny', 'test_keyspace',
'Partitions(master): -80 80-\n' 'Partitions(master): -80 80-\n'
'Partitions(rdonly): -80 80-c0 c0-\n' 'Partitions(rdonly): -80 80-c0 c0-\n'
'Partitions(replica): -80 80-\n', 'Partitions(replica): -80 80-\n',
keyspace_id_type=keyspace_id_type) keyspace_id_type=keyspace_id_type,
sharding_column_name='custom_sharding_key')
utils.check_tablet_query_service(self, shard_0_ny_rdonly, True, False) utils.check_tablet_query_service(self, shard_0_ny_rdonly, True, False)
utils.check_tablet_query_service(self, shard_1_ny_rdonly, False, True) utils.check_tablet_query_service(self, shard_1_ny_rdonly, False, True)
utils.check_tablet_query_service(self, shard_1_rdonly1, False, True) utils.check_tablet_query_service(self, shard_1_rdonly1, False, True)
@ -720,7 +728,8 @@ primary key (name)
'Partitions(master): -80 80-\n' 'Partitions(master): -80 80-\n'
'Partitions(rdonly): -80 80-c0 c0-\n' 'Partitions(rdonly): -80 80-c0 c0-\n'
'Partitions(replica): -80 80-c0 c0-\n', 'Partitions(replica): -80 80-c0 c0-\n',
keyspace_id_type=keyspace_id_type) keyspace_id_type=keyspace_id_type,
sharding_column_name='custom_sharding_key')
utils.check_tablet_query_service(self, shard_1_slave2, False, True) utils.check_tablet_query_service(self, shard_1_slave2, False, True)
# move replica back and forth # move replica back and forth
@ -739,7 +748,8 @@ primary key (name)
'Partitions(master): -80 80-\n' 'Partitions(master): -80 80-\n'
'Partitions(rdonly): -80 80-c0 c0-\n' 'Partitions(rdonly): -80 80-c0 c0-\n'
'Partitions(replica): -80 80-\n', 'Partitions(replica): -80 80-\n',
keyspace_id_type=keyspace_id_type) keyspace_id_type=keyspace_id_type,
sharding_column_name='custom_sharding_key')
utils.run_vtctl(['MigrateServedTypes', 'test_keyspace/80-', 'replica'], utils.run_vtctl(['MigrateServedTypes', 'test_keyspace/80-', 'replica'],
auto_log=True) auto_log=True)
@ -755,7 +765,8 @@ primary key (name)
'Partitions(master): -80 80-\n' 'Partitions(master): -80 80-\n'
'Partitions(rdonly): -80 80-c0 c0-\n' 'Partitions(rdonly): -80 80-c0 c0-\n'
'Partitions(replica): -80 80-c0 c0-\n', 'Partitions(replica): -80 80-c0 c0-\n',
keyspace_id_type=keyspace_id_type) keyspace_id_type=keyspace_id_type,
sharding_column_name='custom_sharding_key')
# reparent shard_2 to shard_2_replica1, then insert more data and # reparent shard_2 to shard_2_replica1, then insert more data and
# see it flow through still # see it flow through still
@ -810,7 +821,8 @@ primary key (name)
'Partitions(master): -80 80-c0 c0-\n' 'Partitions(master): -80 80-c0 c0-\n'
'Partitions(rdonly): -80 80-c0 c0-\n' 'Partitions(rdonly): -80 80-c0 c0-\n'
'Partitions(replica): -80 80-c0 c0-\n', 'Partitions(replica): -80 80-c0 c0-\n',
keyspace_id_type=keyspace_id_type) keyspace_id_type=keyspace_id_type,
sharding_column_name='custom_sharding_key')
utils.check_tablet_query_service(self, shard_1_master, False, True) utils.check_tablet_query_service(self, shard_1_master, False, True)
# check the binlog players are gone now # check the binlog players are gone now

Просмотреть файл

@ -893,7 +893,8 @@ def wait_db_read_only(uid):
raise e raise e
def check_srv_keyspace(cell, keyspace, expected, keyspace_id_type='uint64'): def check_srv_keyspace(cell, keyspace, expected, keyspace_id_type='uint64',
sharding_column_name='keyspace_id'):
ks = run_vtctl_json(['GetSrvKeyspace', cell, keyspace]) ks = run_vtctl_json(['GetSrvKeyspace', cell, keyspace])
result = '' result = ''
pmap = {} pmap = {}
@ -922,7 +923,7 @@ def check_srv_keyspace(cell, keyspace, expected, keyspace_id_type='uint64'):
'Mismatch in srv keyspace for cell %s keyspace %s, expected:\n%' 'Mismatch in srv keyspace for cell %s keyspace %s, expected:\n%'
's\ngot:\n%s' % ( 's\ngot:\n%s' % (
cell, keyspace, expected, result)) cell, keyspace, expected, result))
if 'keyspace_id' != ks.get('sharding_column_name'): if sharding_column_name != ks.get('sharding_column_name'):
raise Exception('Got wrong sharding_column_name in SrvKeyspace: %s' % raise Exception('Got wrong sharding_column_name in SrvKeyspace: %s' %
str(ks)) str(ks))
if keyspace_id_type != keyrange_constants.PROTO3_KIT_TO_STRING[ if keyspace_id_type != keyrange_constants.PROTO3_KIT_TO_STRING[