diff --git a/py/vtdb/vtgate_cursor.py b/py/vtdb/vtgate_cursor.py index 314e0c564b..df79b6cdb9 100644 --- a/py/vtdb/vtgate_cursor.py +++ b/py/vtdb/vtgate_cursor.py @@ -2,6 +2,8 @@ # Use of this source code is governed by a BSD-style license that can # be found in the LICENSE file. +"""VTGateCursor, BatchVTGateCursor, and StreamVTGateCursor.""" + import itertools import operator import re @@ -208,8 +210,9 @@ class BatchVTGateCursor(VTGateCursor): """Batch Cursor for VTGate. This cursor allows 'n' queries to be executed against - 'm' keyspace_ids. For writes though, it maybe prefereable + 'm' keyspace_ids. For writes though, it maybe preferable to only execute against one keyspace_id. + This only supports keyspace_ids right now since that is what the underlying vtgate server supports. """ diff --git a/py/vtdb/vtgatev2.py b/py/vtdb/vtgatev2.py index e6ae1e56c0..d475d6bb1f 100644 --- a/py/vtdb/vtgatev2.py +++ b/py/vtdb/vtgatev2.py @@ -2,6 +2,9 @@ # Use of this source code is governed by a BSD-style license that can # be found in the LICENSE file. +"""A simple, direct connection to the vttablet query server.""" + + from itertools import izip import logging import random @@ -400,12 +403,12 @@ class VTGateConnection(vtgate_client.VTGateClient): try: self.client.stream_call(exec_method, req) first_response = self.client.stream_next() - reply = first_response.reply['Result'] - - for field in reply['Fields']: - self._stream_fields.append((field['Name'], field['Type'])) - self._stream_conversions.append( - field_types.conversions.get(field['Type'])) + if first_response: + reply = first_response.reply['Result'] + for field in reply['Fields']: + self._stream_fields.append((field['Name'], field['Type'])) + self._stream_conversions.append( + field_types.conversions.get(field['Type'])) except gorpc.GoRpcError as e: self.logger_object.log_private_data(bind_variables) raise convert_exception(e, str(self), sql, keyspace_ids, keyranges, diff --git a/test/python_client_test.py b/test/python_client_test.py index 6fc4daef89..dd3f713eae 100755 --- a/test/python_client_test.py +++ b/test/python_client_test.py @@ -107,8 +107,8 @@ class TestPythonClient(unittest.TestCase): KEYSPACE_ID_0X80 = struct.Struct('!Q').pack(0x80 << 56) def _open_keyspace_ids_cursor(self): - return self.conn.cursor('keyspace', 'master', - keyspace_ids=[self.KEYSPACE_ID_0X80]) + return self.conn.cursor( + 'keyspace', 'master', keyspace_ids=[self.KEYSPACE_ID_0X80]) def _open_keyranges_cursor(self): kr = keyrange.KeyRange(keyrange_constants.NON_PARTIAL_KEYRANGE) @@ -118,6 +118,17 @@ class TestPythonClient(unittest.TestCase): return self.conn.cursor( tablet_type='master', cursorclass=vtgate_cursor.BatchVTGateCursor) + def _open_stream_keyranges_cursor(self): + kr = keyrange.KeyRange(keyrange_constants.NON_PARTIAL_KEYRANGE) + return self.conn.cursor( + 'keyspace', 'master', keyranges=[kr], + cursorclass=vtgate_cursor.StreamVTGateCursor) + + def _open_stream_keyspace_ids_cursor(self): + return self.conn.cursor( + 'keyspace', 'master', keyspace_ids=[self.KEYSPACE_ID_0X80], + cursorclass=vtgate_cursor.StreamVTGateCursor) + def test_integrity_error(self): """Test we correctly raise dbexceptions.IntegrityError. """ @@ -160,6 +171,9 @@ class TestPythonClient(unittest.TestCase): cursor.flush() cursor.close() + # VTGate.StreamExecuteKeyspaceIds, VTGate.StreamExecuteKeyRanges: + # not handled in vtgateclienttest/services/errors.go. + def test_error(self): """Test a regular server error raises the right exception. """ @@ -169,43 +183,86 @@ class TestPythonClient(unittest.TestCase): self.conn.get_srv_keyspace('error') def test_effective_caller_id(self): + """Test that the passed in effective_caller_id is parsed correctly. + + Pass a special sql query that sends the expected + effective_caller_id through different vtgate interfaces. Make sure + the good_effective_caller_id works, and the + bad_effective_caller_id raises a DatabaseError. + + """ # Special query that makes vtgateclienttest match effective_caller_id. effective_caller_id_test_query = ( 'callerid://{"principal":"pr", "component":"co", "subcomponent":"su"}') - effective_caller_id = { + good_effective_caller_id = { 'Principal': 'pr', 'Component': 'co', 'Subcomponent': 'su'} + bad_effective_caller_id = { + 'Principal': 'pr_wrong', + 'Component': 'co_wrong', 'Subcomponent': 'su_wrong'} - # ExecuteKeyspaceIds test - cursor = self._open_keyspace_ids_cursor() - cursor.execute( - effective_caller_id_test_query, {}, - effective_caller_id=effective_caller_id) - cursor.close() + def check_good_and_bad_effective_caller_ids(cursor, cursor_execute_method): + cursor_execute_method(cursor, good_effective_caller_id) + with self.assertRaises(dbexceptions.DatabaseError): + cursor_execute_method(cursor, bad_effective_caller_id) + cursor.close() - # ExecuteKeyRanges test - cursor = self._open_keyranges_cursor() - cursor.execute( - effective_caller_id_test_query, {}, - effective_caller_id=effective_caller_id) - cursor.close() + def cursor_execute_keyspace_ids_method(cursor, effective_caller_id): + cursor.execute( + effective_caller_id_test_query, {}, + effective_caller_id=effective_caller_id) - # ExecuteEntityIds test - cursor = self.conn.cursor('keyspace', 'master') - cursor.execute_entity_ids( - effective_caller_id_test_query, {}, - entity_keyspace_id_map={1: self.KEYSPACE_ID_0X80}, - entity_column_name='user_id', - effective_caller_id=effective_caller_id) - cursor.close() + check_good_and_bad_effective_caller_ids( + self._open_keyspace_ids_cursor(), cursor_execute_keyspace_ids_method) + + def cursor_execute_key_ranges_method(cursor, effective_caller_id): + cursor.execute( + effective_caller_id_test_query, {}, + effective_caller_id=effective_caller_id) + + check_good_and_bad_effective_caller_ids( + self._open_keyranges_cursor(), cursor_execute_key_ranges_method) + + def cursor_execute_entity_ids_method(cursor, effective_caller_id): + cursor.execute_entity_ids( + effective_caller_id_test_query, {}, + entity_keyspace_id_map={1: self.KEYSPACE_ID_0X80}, + entity_column_name='user_id', + effective_caller_id=effective_caller_id) + + check_good_and_bad_effective_caller_ids( + self.conn.cursor('keyspace', 'master'), + cursor_execute_entity_ids_method) + + def cursor_execute_batch_keyspace_ids_method(cursor, effective_caller_id): + cursor.execute( + sql=effective_caller_id_test_query, bind_variables={}, + keyspace='keyspace', + keyspace_ids=[self.KEYSPACE_ID_0X80]) + cursor.flush(effective_caller_id=effective_caller_id) + + check_good_and_bad_effective_caller_ids( + self._open_batch_cursor(), + cursor_execute_batch_keyspace_ids_method) + + def cursor_stream_execute_keyspace_ids_method(cursor, effective_caller_id): + cursor.execute( + sql=effective_caller_id_test_query, bind_variables={}, + effective_caller_id=effective_caller_id) + + check_good_and_bad_effective_caller_ids( + self._open_stream_keyspace_ids_cursor(), + cursor_stream_execute_keyspace_ids_method) + + def cursor_stream_execute_keyranges_method(cursor, effective_caller_id): + cursor.execute( + sql=effective_caller_id_test_query, bind_variables={}, + effective_caller_id=effective_caller_id) + + check_good_and_bad_effective_caller_ids( + self._open_stream_keyranges_cursor(), + cursor_stream_execute_keyranges_method) - # ExecuteBatchKeyspaceIds test - cursor = self._open_batch_cursor() - cursor.execute( - sql=effective_caller_id_test_query, bind_variables={}, - keyspace='keyspace', - keyspace_ids=[self.KEYSPACE_ID_0X80]) - cursor.flush(effective_caller_id=effective_caller_id) if __name__ == '__main__':