This commit is contained in:
Alain Jobart 2016-06-13 08:18:03 -07:00
Родитель e4de2a8ee8
Коммит 8658d455a0
4 изменённых файлов: 65 добавлений и 11 удалений

Просмотреть файл

@ -194,6 +194,20 @@ func (rtr *Router) StreamExecuteRoute(vcursor *requestContext, route *engine.Rou
)
}
// IsKeyspaceRangeBasedSharded returns true if the keyspace in the vschema is
// marked as sharded.
func (rtr *Router) IsKeyspaceRangeBasedSharded(keyspace string) bool {
vschema := rtr.planner.VSchema()
ks, ok := vschema.Keyspaces[keyspace]
if !ok {
return false
}
if ks.Keyspace == nil {
return false
}
return ks.Keyspace.Sharded
}
func (rtr *Router) paramsUnsharded(vcursor *requestContext, route *engine.Route) (*scatterParams, error) {
ks, _, allShards, err := getKeyspaceShards(vcursor.ctx, rtr.serv, rtr.cell, route.Keyspace.Name, vcursor.tabletType)
if err != nil {

Просмотреть файл

@ -530,6 +530,27 @@ func (vtg *VTGate) Rollback(ctx context.Context, session *vtgatepb.Session) erro
return formatError(vtg.resolver.Rollback(ctx, session))
}
// isKeyspaceRangeBasedSharded returns true if a keyspace is sharded
// by range. This is true when there is a ShardingColumnType defined
// in the SrvKeyspace (that is using the range-based sharding with the
// client specifying the sharding key), or when the VSchema for the
// keyspace is Sharded.
func (vtg *VTGate) isKeyspaceRangeBasedSharded(keyspace string, srvKeyspace *topodatapb.SrvKeyspace) bool {
if srvKeyspace.ShardingColumnType != topodatapb.KeyspaceIdType_UNSET {
// We are using range based sharding with the application
// providing the sharding key value.
return true
}
if vtg.router.IsKeyspaceRangeBasedSharded(keyspace) {
// We are using range based sharding with the VSchema
// poviding the routing information
return true
}
// Not range based sharded, might be un-sharded or custom sharded.
return false
}
// SplitQuery splits a query into sub queries by appending keyranges and
// primary key range clauses. Rows corresponding to the sub queries
// are guaranteed to be non-overlapping and will add up to the rows of
@ -545,7 +566,7 @@ func (vtg *VTGate) SplitQuery(ctx context.Context, keyspace string, sql string,
// sharding_column_type != KeyspaceIdType_UNSET can happen in one of the following two cases:
// 1. We are querying a sharded keyspace;
// 2. We are querying an unsharded keyspace which is being sharded.
if srvKeyspace.ShardingColumnType != topodatapb.KeyspaceIdType_UNSET {
if vtg.isKeyspaceRangeBasedSharded(keyspace, srvKeyspace) {
// we are using range-based sharding, so the result
// will be a list of Splits with KeyRange clauses
keyRangeByShard := make(map[string]*topodatapb.KeyRange)
@ -600,7 +621,7 @@ func (vtg *VTGate) SplitQueryV2(
// sharding_column_type != KeyspaceIdType_UNSET can happen in one of the following two cases:
// 1. We are querying a sharded keyspace;
// 2. We are querying an unsharded keyspace which is being sharded.
if srvKeyspace.ShardingColumnType != topodatapb.KeyspaceIdType_UNSET {
if vtg.isKeyspaceRangeBasedSharded(keyspace, srvKeyspace) {
// Index the shard references in 'shardRefs' by shard name.
shardRefByName := make(map[string]*topodatapb.ShardReference, len(shardRefs))
for _, shardRef := range shardRefs {

Просмотреть файл

@ -55,8 +55,6 @@ class TestEnv(object):
self._init_tablet(keyspace, shard, 'rdonly', i, tablet_index)
tablet_index += 1
utils.run_vtctl(['RebuildKeyspaceGraph', keyspace], auto_log=True)
# Start tablets.
for shard in shards:
self._start_tablet(keyspace, shard, 'master', None)
@ -76,16 +74,9 @@ class TestEnv(object):
t.tablet_alias], auto_log=True)
t.tablet_type = 'master'
for t in self.tablets:
if t.tablet_type == 'replica':
utils.wait_for_tablet_type(t.tablet_alias, 'replica')
elif t.tablet_type == 'rdonly':
utils.wait_for_tablet_type(t.tablet_alias, 'rdonly')
for t in self.tablets:
t.wait_for_vttablet_state('SERVING')
utils.run_vtctl(['RebuildKeyspaceGraph', keyspace], auto_log=True)
for ddl in ddls:
fname = os.path.join(environment.tmproot, 'ddl.sql')
with open(fname, 'w') as f:

Просмотреть файл

@ -260,6 +260,7 @@ def setUpModule():
create_join_user_extra,
create_join_name_info,
],
rdonly_count=1, # to test SplitQuery
)
keyspace_env.launch(
'lookup',
@ -1055,5 +1056,32 @@ class TestVTGateFunctions(unittest.TestCase):
bindvars={'user_id': 11})
self.assertEqual(len(qr['rows'] or []), 0)
def test_split_query(self):
"""This test uses 'vtctl VtGateSplitQuery' to validate the Map-Reduce APIs.
We want to return KeyRange queries.
"""
sql = 'select id, name from vt_user'
s = utils.vtgate.split_query(sql, 'user', 2)
self.assertEqual(len(s), 2)
first_half_queries = 0
second_half_queries = 0
for q in s:
self.assertEqual(q['query']['sql'], sql)
self.assertIn('key_range_part', q)
self.assertEqual(len(q['key_range_part']['key_ranges']), 1)
kr = q['key_range_part']['key_ranges'][0]
eighty_in_base64 = 'gA=='
is_first_half = 'start' not in kr and kr['end'] == eighty_in_base64
is_second_half = 'end' not in kr and kr['start'] == eighty_in_base64
self.assertTrue(is_first_half or is_second_half,
'invalid keyrange %s' % str(kr))
if is_first_half:
first_half_queries += 1
else:
second_half_queries += 1
self.assertEqual(first_half_queries, 1, 'invalid split %s' % str(s))
self.assertEqual(second_half_queries, 1, 'invalid split %s' % str(s))
if __name__ == '__main__':
utils.main()