Test that effective_caller_id is passed to VTGate.StreamExecute interfaces.

Send both good and bad effective_caller_ids through the interfaces to make sure
that verification is actually done.
This commit is contained in:
Dean Yasuda 2015-08-23 13:33:47 -07:00
Родитель 64b9b34f63
Коммит 59278ea991
3 изменённых файлов: 100 добавлений и 37 удалений

Просмотреть файл

@ -2,6 +2,8 @@
# Use of this source code is governed by a BSD-style license that can # Use of this source code is governed by a BSD-style license that can
# be found in the LICENSE file. # be found in the LICENSE file.
"""VTGateCursor, BatchVTGateCursor, and StreamVTGateCursor."""
import itertools import itertools
import operator import operator
import re import re
@ -208,8 +210,9 @@ class BatchVTGateCursor(VTGateCursor):
"""Batch Cursor for VTGate. """Batch Cursor for VTGate.
This cursor allows 'n' queries to be executed against This cursor allows 'n' queries to be executed against
'm' keyspace_ids. For writes though, it maybe prefereable 'm' keyspace_ids. For writes though, it maybe preferable
to only execute against one keyspace_id. to only execute against one keyspace_id.
This only supports keyspace_ids right now since that is what This only supports keyspace_ids right now since that is what
the underlying vtgate server supports. the underlying vtgate server supports.
""" """

Просмотреть файл

@ -2,6 +2,9 @@
# Use of this source code is governed by a BSD-style license that can # Use of this source code is governed by a BSD-style license that can
# be found in the LICENSE file. # be found in the LICENSE file.
"""A simple, direct connection to the vttablet query server."""
from itertools import izip from itertools import izip
import logging import logging
import random import random
@ -400,12 +403,12 @@ class VTGateConnection(vtgate_client.VTGateClient):
try: try:
self.client.stream_call(exec_method, req) self.client.stream_call(exec_method, req)
first_response = self.client.stream_next() first_response = self.client.stream_next()
reply = first_response.reply['Result'] if first_response:
reply = first_response.reply['Result']
for field in reply['Fields']: for field in reply['Fields']:
self._stream_fields.append((field['Name'], field['Type'])) self._stream_fields.append((field['Name'], field['Type']))
self._stream_conversions.append( self._stream_conversions.append(
field_types.conversions.get(field['Type'])) field_types.conversions.get(field['Type']))
except gorpc.GoRpcError as e: except gorpc.GoRpcError as e:
self.logger_object.log_private_data(bind_variables) self.logger_object.log_private_data(bind_variables)
raise convert_exception(e, str(self), sql, keyspace_ids, keyranges, raise convert_exception(e, str(self), sql, keyspace_ids, keyranges,

Просмотреть файл

@ -107,8 +107,8 @@ class TestPythonClient(unittest.TestCase):
KEYSPACE_ID_0X80 = struct.Struct('!Q').pack(0x80 << 56) KEYSPACE_ID_0X80 = struct.Struct('!Q').pack(0x80 << 56)
def _open_keyspace_ids_cursor(self): def _open_keyspace_ids_cursor(self):
return self.conn.cursor('keyspace', 'master', return self.conn.cursor(
keyspace_ids=[self.KEYSPACE_ID_0X80]) 'keyspace', 'master', keyspace_ids=[self.KEYSPACE_ID_0X80])
def _open_keyranges_cursor(self): def _open_keyranges_cursor(self):
kr = keyrange.KeyRange(keyrange_constants.NON_PARTIAL_KEYRANGE) kr = keyrange.KeyRange(keyrange_constants.NON_PARTIAL_KEYRANGE)
@ -118,6 +118,17 @@ class TestPythonClient(unittest.TestCase):
return self.conn.cursor( return self.conn.cursor(
tablet_type='master', cursorclass=vtgate_cursor.BatchVTGateCursor) tablet_type='master', cursorclass=vtgate_cursor.BatchVTGateCursor)
def _open_stream_keyranges_cursor(self):
kr = keyrange.KeyRange(keyrange_constants.NON_PARTIAL_KEYRANGE)
return self.conn.cursor(
'keyspace', 'master', keyranges=[kr],
cursorclass=vtgate_cursor.StreamVTGateCursor)
def _open_stream_keyspace_ids_cursor(self):
return self.conn.cursor(
'keyspace', 'master', keyspace_ids=[self.KEYSPACE_ID_0X80],
cursorclass=vtgate_cursor.StreamVTGateCursor)
def test_integrity_error(self): def test_integrity_error(self):
"""Test we correctly raise dbexceptions.IntegrityError. """Test we correctly raise dbexceptions.IntegrityError.
""" """
@ -160,6 +171,9 @@ class TestPythonClient(unittest.TestCase):
cursor.flush() cursor.flush()
cursor.close() cursor.close()
# VTGate.StreamExecuteKeyspaceIds, VTGate.StreamExecuteKeyRanges:
# not handled in vtgateclienttest/services/errors.go.
def test_error(self): def test_error(self):
"""Test a regular server error raises the right exception. """Test a regular server error raises the right exception.
""" """
@ -169,43 +183,86 @@ class TestPythonClient(unittest.TestCase):
self.conn.get_srv_keyspace('error') self.conn.get_srv_keyspace('error')
def test_effective_caller_id(self): def test_effective_caller_id(self):
"""Test that the passed in effective_caller_id is parsed correctly.
Pass a special sql query that sends the expected
effective_caller_id through different vtgate interfaces. Make sure
the good_effective_caller_id works, and the
bad_effective_caller_id raises a DatabaseError.
"""
# Special query that makes vtgateclienttest match effective_caller_id. # Special query that makes vtgateclienttest match effective_caller_id.
effective_caller_id_test_query = ( effective_caller_id_test_query = (
'callerid://{"principal":"pr", "component":"co", "subcomponent":"su"}') 'callerid://{"principal":"pr", "component":"co", "subcomponent":"su"}')
effective_caller_id = { good_effective_caller_id = {
'Principal': 'pr', 'Component': 'co', 'Subcomponent': 'su'} 'Principal': 'pr', 'Component': 'co', 'Subcomponent': 'su'}
bad_effective_caller_id = {
'Principal': 'pr_wrong',
'Component': 'co_wrong', 'Subcomponent': 'su_wrong'}
# ExecuteKeyspaceIds test def check_good_and_bad_effective_caller_ids(cursor, cursor_execute_method):
cursor = self._open_keyspace_ids_cursor() cursor_execute_method(cursor, good_effective_caller_id)
cursor.execute( with self.assertRaises(dbexceptions.DatabaseError):
effective_caller_id_test_query, {}, cursor_execute_method(cursor, bad_effective_caller_id)
effective_caller_id=effective_caller_id) cursor.close()
cursor.close()
# ExecuteKeyRanges test def cursor_execute_keyspace_ids_method(cursor, effective_caller_id):
cursor = self._open_keyranges_cursor() cursor.execute(
cursor.execute( effective_caller_id_test_query, {},
effective_caller_id_test_query, {}, effective_caller_id=effective_caller_id)
effective_caller_id=effective_caller_id)
cursor.close()
# ExecuteEntityIds test check_good_and_bad_effective_caller_ids(
cursor = self.conn.cursor('keyspace', 'master') self._open_keyspace_ids_cursor(), cursor_execute_keyspace_ids_method)
cursor.execute_entity_ids(
effective_caller_id_test_query, {}, def cursor_execute_key_ranges_method(cursor, effective_caller_id):
entity_keyspace_id_map={1: self.KEYSPACE_ID_0X80}, cursor.execute(
entity_column_name='user_id', effective_caller_id_test_query, {},
effective_caller_id=effective_caller_id) effective_caller_id=effective_caller_id)
cursor.close()
check_good_and_bad_effective_caller_ids(
self._open_keyranges_cursor(), cursor_execute_key_ranges_method)
def cursor_execute_entity_ids_method(cursor, effective_caller_id):
cursor.execute_entity_ids(
effective_caller_id_test_query, {},
entity_keyspace_id_map={1: self.KEYSPACE_ID_0X80},
entity_column_name='user_id',
effective_caller_id=effective_caller_id)
check_good_and_bad_effective_caller_ids(
self.conn.cursor('keyspace', 'master'),
cursor_execute_entity_ids_method)
def cursor_execute_batch_keyspace_ids_method(cursor, effective_caller_id):
cursor.execute(
sql=effective_caller_id_test_query, bind_variables={},
keyspace='keyspace',
keyspace_ids=[self.KEYSPACE_ID_0X80])
cursor.flush(effective_caller_id=effective_caller_id)
check_good_and_bad_effective_caller_ids(
self._open_batch_cursor(),
cursor_execute_batch_keyspace_ids_method)
def cursor_stream_execute_keyspace_ids_method(cursor, effective_caller_id):
cursor.execute(
sql=effective_caller_id_test_query, bind_variables={},
effective_caller_id=effective_caller_id)
check_good_and_bad_effective_caller_ids(
self._open_stream_keyspace_ids_cursor(),
cursor_stream_execute_keyspace_ids_method)
def cursor_stream_execute_keyranges_method(cursor, effective_caller_id):
cursor.execute(
sql=effective_caller_id_test_query, bind_variables={},
effective_caller_id=effective_caller_id)
check_good_and_bad_effective_caller_ids(
self._open_stream_keyranges_cursor(),
cursor_stream_execute_keyranges_method)
# ExecuteBatchKeyspaceIds test
cursor = self._open_batch_cursor()
cursor.execute(
sql=effective_caller_id_test_query, bind_variables={},
keyspace='keyspace',
keyspace_ids=[self.KEYSPACE_ID_0X80])
cursor.flush(effective_caller_id=effective_caller_id)
if __name__ == '__main__': if __name__ == '__main__':