Module and test to compute the task map and query where clauses for streaming queries.

This commit is contained in:
shrutip 2014-01-03 13:53:03 -08:00
Родитель ac5bc8b290
Коммит 28fdaef641
3 изменённых файлов: 162 добавлений и 0 удалений

Просмотреть файл

@ -41,3 +41,4 @@ integration_test:
cd test ; echo "resharding test"; time ./resharding.py $$VT_TEST_FLAGS
cd test ; echo "resharding_bytes test"; time ./resharding_bytes.py $$VT_TEST_FLAGS
cd test ; echo "vtdb test"; time ./vtdb_test.py $$VT_TEST_FLAGS
cd test ; echo "keyrange test"; time ./keyrange_test.py $$VT_TEST_FLAGS

83
py/vtdb/keyrange.py Normal file
Просмотреть файл

@ -0,0 +1,83 @@
# Copyright 2013, Google Inc. All rights reserved.
# Use of this source code is governed by a BSD-style license that can
# be found in the LICENSE file.
from vtdb import dbexceptions
# This module computes task map and query where clause and
# bind_vars for distrubuting the workload of streaming queries.
class StreamingTaskMap(object):
keyrange_list = None
def __init__(self, num_tasks):
self.num_tasks = num_tasks
def compute_kr_list(self):
self.keyrange_list = []
kr_chunks = []
min_key_hex = int("00", base=16)
max_key_hex = int("100", base=16)
kr = min_key_hex
kr_chunks.append('')
span = (max_key_hex - min_key_hex)/self.num_tasks
for i in xrange(self.num_tasks):
kr += span
kr_chunks.append(hex(kr))
kr_chunks[-1] = ''
self.keyrange_list = [(kr_chunks[i], kr_chunks[i+1]) for i in xrange(len(kr_chunks) - 1)]
# Compute the task map for a streaming query.
# global_shard_count is read from config, using it as a param for simplicity.
def create_streaming_task_map(num_tasks, global_shard_count):
# global_shard_count is a configurable value controlled for resharding.
if num_tasks < global_shard_count:
raise dbexceptions.ProgrammingError("Tasks %d cannot be less than number of shards %d" % (num_tasks, global_shard_count))
stm = StreamingTaskMap(num_tasks)
stm.compute_kr_list()
return stm
# We abbreviate the keyranges for ease of use.
# To obtain true value for comparison with keyspace id,
# create true hex value for that keyrange by right padding and conversion.
def _true_keyspace_id_value(kr_value):
if kr_value == '':
return None
if kr_value.startswith('0x'):
kr_value = kr_value.split('0x')[1]
true_hex_val = kr_value + (16-len(kr_value))*'0'
return int(true_hex_val, base=16)
# Compute the where clause and bind_vars for a given keyrange.
def create_where_clause_for_keyrange(keyrange, col_name='keyspace_id'):
kr_min = None
kr_max = None
if isinstance(keyrange, str):
keyrange = keyrange.split('-')
if (isinstance(keyrange, tuple) or isinstance(keyrange, list)) and len(keyrange) == 2:
kr_min = _true_keyspace_id_value(keyrange[0])
kr_max = _true_keyspace_id_value(keyrange[1])
else:
raise dbexceptions.ProgrammingError("keyrange must be a list or tuple or a '-' separated str %s" % keyrange)
where_clause = ''
bind_vars = {}
i = 0
if kr_min is not None:
bind_name = "%s%d" % (col_name, i)
where_clause = "%s >= " % col_name + "%(" + bind_name + ")s"
i += 1
bind_vars[bind_name] = kr_min
if kr_max is not None:
if where_clause != '':
where_clause += ' AND '
bind_name = "%s%d" % (col_name, i)
where_clause += "%s < " % col_name + "%(" + bind_name + ")s"
bind_vars[bind_name] = kr_max
return where_clause, bind_vars

78
test/keyrange_test.py Executable file
Просмотреть файл

@ -0,0 +1,78 @@
#!/usr/bin/python
import unittest
import utils
from vtdb import dbexceptions
from vtdb import keyspace
from vtdb import keyrange
# This unittest tests the computation of task map
# and where clauses for streaming queries.
class TestKeyrange(unittest.TestCase):
def test_incorrect_tasks(self):
global_shard_count = 16
with self.assertRaises(dbexceptions.ProgrammingError):
stm = keyrange.create_streaming_task_map(4, global_shard_count)
def test_keyranges_for_tasks(self):
for global_shard_count in (16,32,64):
num_tasks = global_shard_count
stm = keyrange.create_streaming_task_map(num_tasks, global_shard_count)
self.assertEqual(len(stm.keyrange_list), num_tasks)
num_tasks = global_shard_count*2
stm = keyrange.create_streaming_task_map(num_tasks, global_shard_count)
self.assertEqual(len(stm.keyrange_list), num_tasks)
num_tasks = global_shard_count*2 + 3
stm = keyrange.create_streaming_task_map(num_tasks, global_shard_count)
self.assertEqual(len(stm.keyrange_list), num_tasks)
num_tasks = global_shard_count*8
stm = keyrange.create_streaming_task_map(num_tasks, global_shard_count)
self.assertEqual(len(stm.keyrange_list), num_tasks)
# This tests that the where clause and bind_vars generated for each shard
# against a few sample values.
def test_bind_values_for_keyrange_list(self):
shard_kid_map = {('', '0x10'):[527875958493693904, 626750931627689502, 345387386794260318, 332484755310826578],
('0x10', '0x20'):[1842642426274125671, 1326307661227634652, 1761124146422844620, 1661669973250483744],
('0x20', '0x30'):[3361397649937244239, 3303511690915522723, 2444880764308344533, 2973657788686139039],
('0x30', '0x40'):[3821005920507858605, 4575089859165626432, 3607090456016432961, 3979558375123453425],
('0x40', '0x50'):[5129057445097465905, 5464969577815708398, 5190676584475132364, 5762096070688827561],
('0x50', '0x60'):[6419540613918919447, 6867152356089593986, 6601838130703675400, 6132605084892127391],
('0x60', '0x70'):[7251511061270371980, 7395364497868053835, 7814586147633440734, 7968977924086033834],
('0x70', '0x80'):[8653665459643609079, 8419099072545971426, 9020726671664230611, 9064594986161620444],
('0x80', '0x90'):[9767889778372766922, 9742070682920810358, 10296850775085416642, 9537430901666854108],
('0x90', '0xa0'):[10440455099304929791, 11454183276974683945, 11185910247776122031, 10460396697869122981],
('0xa0', '0xb0'):[11935085245138597119, 12115696589214223782, 12639360876311033978, 12548906240535188165],
('0xb0', '0xc0'):[13379616110062597001, 12826553979133932576, 13288572810772383281, 13471801046560785347],
('0xc0', '0xd0'):[14394342688314745188, 14639660031570920207, 14646353412066152016, 14186650213447467187],
('0xd0', '0xe0'):[15397348460895960623, 16014223083986915239, 15058390871463382185, 15811857963302932363],
('0xe0', '0xf0'):[17275711019497396001, 16979796627403646478, 16635982235308289704, 16906674090344806032],
('0xf0', ''):[18229242992218358675, 17623451135465171527, 18333015752598164958, 17775908119782706671],
}
stm = keyrange.create_streaming_task_map(16, 16)
for i, kr in enumerate(stm.keyrange_list):
where_clause, bind_vars = keyrange.create_where_clause_for_keyrange(kr)
if len(bind_vars.keys()) == 1:
if kr[0] == '':
self.assertNotEqual(where_clause.find('<'), -1)
else:
self.assertNotEqual(where_clause.find('>='), -1)
else:
self.assertNotEqual(where_clause.find('>='), -1)
self.assertNotEqual(where_clause.find('>='), -1)
self.assertNotEqual(where_clause.find('AND'), -1)
kid_list = shard_kid_map[kr]
for keyspace_id in kid_list:
if len(bind_vars.keys()) == 1:
if kr[0] == '':
self.assertLess(keyspace_id, bind_vars['keyspace_id0'])
else:
self.assertGreaterEqual(keyspace_id, bind_vars['keyspace_id0'])
else:
self.assertGreaterEqual(keyspace_id, bind_vars['keyspace_id0'])
self.assertLess(keyspace_id, bind_vars['keyspace_id1'])
if __name__ == '__main__':
utils.main()