зеркало из https://github.com/github/vitess-gh.git
Merge branch 'master' into replication
This commit is contained in:
Коммит
000d280c98
|
@ -8,8 +8,6 @@ from vtdb import dbexceptions
|
||||||
from vtdb import keyrange_constants
|
from vtdb import keyrange_constants
|
||||||
|
|
||||||
|
|
||||||
ZK_KEYSPACE_PATH = '/zk/local/vt/ns'
|
|
||||||
|
|
||||||
pack_keyspace_id = struct.Struct('!Q').pack
|
pack_keyspace_id = struct.Struct('!Q').pack
|
||||||
|
|
||||||
# Represent the SrvKeyspace object from the toposerver, and provide functions
|
# Represent the SrvKeyspace object from the toposerver, and provide functions
|
||||||
|
@ -22,11 +20,6 @@ class Keyspace(object):
|
||||||
sharding_col_type = None
|
sharding_col_type = None
|
||||||
served_from = None
|
served_from = None
|
||||||
|
|
||||||
shard_count = None
|
|
||||||
shard_max_keys = None # Sorted list of the max keys for each shard.
|
|
||||||
shard_names = None # Sorted list of shard names -
|
|
||||||
# these will match the order of shard_max_keys.
|
|
||||||
|
|
||||||
# load this object from a SrvKeyspace object generated by vt
|
# load this object from a SrvKeyspace object generated by vt
|
||||||
def __init__(self, name, data):
|
def __init__(self, name, data):
|
||||||
self.name = name
|
self.name = name
|
||||||
|
@ -35,13 +28,6 @@ class Keyspace(object):
|
||||||
self.sharding_col_name = data.get('ShardingColumnName', "")
|
self.sharding_col_name = data.get('ShardingColumnName', "")
|
||||||
self.sharding_col_type = data.get('ShardingColumnType', keyrange_constants.KIT_UNSET)
|
self.sharding_col_type = data.get('ShardingColumnType', keyrange_constants.KIT_UNSET)
|
||||||
self.served_from = data.get('ServedFrom', None)
|
self.served_from = data.get('ServedFrom', None)
|
||||||
self.shard_count = len(data['Shards'])
|
|
||||||
# if we have real values for shards and KeyRange.End, grab them
|
|
||||||
if self.shard_count > 1 and data['Shards'][0]['KeyRange']['End'] != "":
|
|
||||||
self.shard_max_keys = [shard['KeyRange']['End']
|
|
||||||
for shard in data['Shards']]
|
|
||||||
# We end up needing the name for addressing so compute this once.
|
|
||||||
self.shard_names = self._make_shard_names()
|
|
||||||
|
|
||||||
def get_shards(self, db_type):
|
def get_shards(self, db_type):
|
||||||
if not db_type:
|
if not db_type:
|
||||||
|
@ -73,36 +59,15 @@ class Keyspace(object):
|
||||||
shard_max_keys = self.get_shard_max_keys(db_type)
|
shard_max_keys = self.get_shard_max_keys(db_type)
|
||||||
if len(shard_max_keys) == 1 and shard_max_keys[0] == keyrange_constants.MAX_KEY:
|
if len(shard_max_keys) == 1 and shard_max_keys[0] == keyrange_constants.MAX_KEY:
|
||||||
return [keyrange_constants.SHARD_ZERO,]
|
return [keyrange_constants.SHARD_ZERO,]
|
||||||
for i, max_key in enumerate(self.shard_max_keys):
|
for i, max_key in enumerate(shard_max_keys):
|
||||||
min_key = keyrange_constants.MIN_KEY
|
min_key = keyrange_constants.MIN_KEY
|
||||||
if i > 0:
|
if i > 0:
|
||||||
min_key = self.shard_max_keys[i-1]
|
min_key = shard_max_keys[i-1]
|
||||||
shard_name = '%s-%s' % (min_key.encode('hex').upper(),
|
shard_name = '%s-%s' % (min_key.encode('hex').upper(),
|
||||||
max_key.encode('hex').upper())
|
max_key.encode('hex').upper())
|
||||||
names.append(shard_name)
|
names.append(shard_name)
|
||||||
return names
|
return names
|
||||||
|
|
||||||
def keyspace_id_to_shard_index_for_db_type(self, keyspace_id, db_type):
|
|
||||||
if not keyspace_id:
|
|
||||||
raise ValueError('keyspace_id is not set')
|
|
||||||
if not db_type:
|
|
||||||
raise ValueError('db_type is not set')
|
|
||||||
# Pack this into big-endian and do a byte-wise comparison.
|
|
||||||
pkid = pack_keyspace_id(keyspace_id)
|
|
||||||
shard_max_keys = self.get_shard_max_keys(db_type)
|
|
||||||
if not shard_max_keys:
|
|
||||||
raise ValueError('Keyspace is not range sharded', self.name)
|
|
||||||
for shard_index, shard_max in enumerate(shard_max_keys):
|
|
||||||
if pkid < shard_max:
|
|
||||||
break
|
|
||||||
# shard_names = self.get_shard_names(db_type)
|
|
||||||
# logging.info('Keyspace resolved %s %s %s %s %s %s', keyspace_id,
|
|
||||||
# pkid.encode('hex').upper(), shard_index,
|
|
||||||
# shard_names[shard_index],
|
|
||||||
# [x.encode('hex').upper() for x in shard_max_keys],
|
|
||||||
# shard_names)
|
|
||||||
return shard_index
|
|
||||||
|
|
||||||
def keyspace_id_to_shard_name_for_db_type(self, keyspace_id, db_type):
|
def keyspace_id_to_shard_name_for_db_type(self, keyspace_id, db_type):
|
||||||
if not keyspace_id:
|
if not keyspace_id:
|
||||||
raise ValueError('keyspace_id is not set')
|
raise ValueError('keyspace_id is not set')
|
||||||
|
@ -119,38 +84,6 @@ class Keyspace(object):
|
||||||
break
|
break
|
||||||
return shard_names[shard_index]
|
return shard_names[shard_index]
|
||||||
|
|
||||||
def keyspace_id_to_shard_index(self, keyspace_id):
|
|
||||||
if not keyspace_id:
|
|
||||||
raise ValueError('keyspace_id is not set')
|
|
||||||
# Pack this into big-endian and do a byte-wise comparison.
|
|
||||||
pkid = pack_keyspace_id(keyspace_id)
|
|
||||||
if not self.shard_max_keys:
|
|
||||||
raise ValueError('Keyspace is not range sharded', self.name)
|
|
||||||
for shard_index, shard_max in enumerate(self.shard_max_keys):
|
|
||||||
if pkid < shard_max:
|
|
||||||
break
|
|
||||||
# logging.info('Keyspace resolved %s %s %s %s %s %s', keyspace_id,
|
|
||||||
# pkid.encode('hex').upper(), shard_index,
|
|
||||||
# self.shard_names[shard_index],
|
|
||||||
# [x.encode('hex').upper() for x in self.shard_max_keys],
|
|
||||||
# self.shard_names)
|
|
||||||
return shard_index
|
|
||||||
|
|
||||||
def _make_shard_names(self):
|
|
||||||
names = []
|
|
||||||
if self.shard_max_keys:
|
|
||||||
for i, max_key in enumerate(self.shard_max_keys):
|
|
||||||
min_key = keyrange_constants.MIN_KEY
|
|
||||||
if i > 0:
|
|
||||||
min_key = self.shard_max_keys[i-1]
|
|
||||||
shard_name = '%s-%s' % (min_key.encode('hex').upper(),
|
|
||||||
max_key.encode('hex').upper())
|
|
||||||
names.append(shard_name)
|
|
||||||
else:
|
|
||||||
# Handle non-range shards
|
|
||||||
names = [str(x) for x in xrange(self.shard_count)]
|
|
||||||
return names
|
|
||||||
|
|
||||||
|
|
||||||
def read_keyspace(topo_client, keyspace_name):
|
def read_keyspace(topo_client, keyspace_name):
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -170,9 +170,8 @@ def is_sharded_keyspace(keyspace_name, db_type):
|
||||||
shard_count = ks.get_shard_count(db_type)
|
shard_count = ks.get_shard_count(db_type)
|
||||||
return shard_count > 1
|
return shard_count > 1
|
||||||
|
|
||||||
def get_keyrange_from_shard_name(keyspace, shard_name, db_type='replica'):
|
def get_keyrange_from_shard_name(keyspace, shard_name, db_type):
|
||||||
kr = None
|
kr = None
|
||||||
# db_type is immaterial here.
|
|
||||||
if not is_sharded_keyspace(keyspace, db_type):
|
if not is_sharded_keyspace(keyspace, db_type):
|
||||||
if shard_name == keyrange_constants.SHARD_ZERO:
|
if shard_name == keyrange_constants.SHARD_ZERO:
|
||||||
kr = keyrange_constants.NON_PARTIAL_KEYRANGE
|
kr = keyrange_constants.NON_PARTIAL_KEYRANGE
|
||||||
|
|
|
@ -213,34 +213,27 @@ class TestKeyspace(unittest.TestCase):
|
||||||
|
|
||||||
def test_shard_count(self):
|
def test_shard_count(self):
|
||||||
sharded_ks = self._read_keyspace(SHARDED_KEYSPACE)
|
sharded_ks = self._read_keyspace(SHARDED_KEYSPACE)
|
||||||
self.assertEqual(sharded_ks.shard_count, 2)
|
|
||||||
for db_type in ALL_DB_TYPES:
|
for db_type in ALL_DB_TYPES:
|
||||||
self.assertEqual(sharded_ks.get_shard_count(db_type), 2)
|
self.assertEqual(sharded_ks.get_shard_count(db_type), 2)
|
||||||
unsharded_ks = self._read_keyspace(UNSHARDED_KEYSPACE)
|
unsharded_ks = self._read_keyspace(UNSHARDED_KEYSPACE)
|
||||||
self.assertEqual(unsharded_ks.shard_count, 1)
|
|
||||||
for db_type in ALL_DB_TYPES:
|
for db_type in ALL_DB_TYPES:
|
||||||
self.assertEqual(unsharded_ks.get_shard_count(db_type), 1)
|
self.assertEqual(unsharded_ks.get_shard_count(db_type), 1)
|
||||||
|
|
||||||
def test_shard_names(self):
|
def test_shard_names(self):
|
||||||
sharded_ks = self._read_keyspace(SHARDED_KEYSPACE)
|
sharded_ks = self._read_keyspace(SHARDED_KEYSPACE)
|
||||||
self.assertEqual(sharded_ks.shard_names, ['-80', '80-'])
|
|
||||||
for db_type in ALL_DB_TYPES:
|
for db_type in ALL_DB_TYPES:
|
||||||
self.assertEqual(sharded_ks.get_shard_names(db_type), ['-80', '80-'])
|
self.assertEqual(sharded_ks.get_shard_names(db_type), ['-80', '80-'])
|
||||||
unsharded_ks = self._read_keyspace(UNSHARDED_KEYSPACE)
|
unsharded_ks = self._read_keyspace(UNSHARDED_KEYSPACE)
|
||||||
self.assertEqual(unsharded_ks.shard_names, ['0'])
|
|
||||||
for db_type in ALL_DB_TYPES:
|
for db_type in ALL_DB_TYPES:
|
||||||
self.assertEqual(unsharded_ks.get_shard_names(db_type), ['0'])
|
self.assertEqual(unsharded_ks.get_shard_names(db_type), ['0'])
|
||||||
|
|
||||||
def test_shard_max_keys(self):
|
def test_shard_max_keys(self):
|
||||||
sharded_ks = self._read_keyspace(SHARDED_KEYSPACE)
|
sharded_ks = self._read_keyspace(SHARDED_KEYSPACE)
|
||||||
want = ['80', '']
|
want = ['80', '']
|
||||||
for i, smk in enumerate(sharded_ks.shard_max_keys):
|
|
||||||
self.assertEqual(smk.encode('hex').upper(), want[i])
|
|
||||||
for db_type in ALL_DB_TYPES:
|
for db_type in ALL_DB_TYPES:
|
||||||
for i, smk in enumerate(sharded_ks.get_shard_max_keys(db_type)):
|
for i, smk in enumerate(sharded_ks.get_shard_max_keys(db_type)):
|
||||||
self.assertEqual(smk.encode('hex').upper(), want[i])
|
self.assertEqual(smk.encode('hex').upper(), want[i])
|
||||||
unsharded_ks = self._read_keyspace(UNSHARDED_KEYSPACE)
|
unsharded_ks = self._read_keyspace(UNSHARDED_KEYSPACE)
|
||||||
self.assertEqual(unsharded_ks.shard_max_keys, None)
|
|
||||||
for db_type in ALL_DB_TYPES:
|
for db_type in ALL_DB_TYPES:
|
||||||
self.assertEqual(unsharded_ks.get_shard_max_keys(db_type), [''])
|
self.assertEqual(unsharded_ks.get_shard_max_keys(db_type), [''])
|
||||||
|
|
||||||
|
@ -250,14 +243,6 @@ class TestKeyspace(unittest.TestCase):
|
||||||
unsharded_ks = self._read_keyspace(UNSHARDED_KEYSPACE)
|
unsharded_ks = self._read_keyspace(UNSHARDED_KEYSPACE)
|
||||||
self.assertEqual(set(unsharded_ks.db_types), set(ALL_DB_TYPES))
|
self.assertEqual(set(unsharded_ks.db_types), set(ALL_DB_TYPES))
|
||||||
|
|
||||||
|
|
||||||
def test_keyspace_id_to_shard_index(self):
|
|
||||||
sharded_ks = self._read_keyspace(SHARDED_KEYSPACE)
|
|
||||||
for i, sn in enumerate(shard_names):
|
|
||||||
for keyspace_id in shard_kid_map[sn]:
|
|
||||||
self.assertEqual(sharded_ks.keyspace_id_to_shard_index(keyspace_id), i)
|
|
||||||
self.assertEqual(sharded_ks.keyspace_id_to_shard_index_for_db_type(keyspace_id, 'master'), i)
|
|
||||||
|
|
||||||
def test_keyspace_id_to_shard_name(self):
|
def test_keyspace_id_to_shard_name(self):
|
||||||
sharded_ks = self._read_keyspace(SHARDED_KEYSPACE)
|
sharded_ks = self._read_keyspace(SHARDED_KEYSPACE)
|
||||||
for _, sn in enumerate(shard_names):
|
for _, sn in enumerate(shard_names):
|
||||||
|
|
Загрузка…
Ссылка в новой задаче