stop using hard coded caller id in tablet.py

1. Stop using youtube specific name as caller id.
2. Set caller id as TabletConnection field and use it in the remaining methods.
3. Delete _stream_execute2 in tablet.py since it is now identical to _stream_execute.
This commit is contained in:
Shengzhe Yao 2015-08-25 10:50:26 -07:00
Родитель 1d7691530a
Коммит c4ff493b5b
7 изменённых файлов: 49 добавлений и 87 удалений

Просмотреть файл

@ -76,12 +76,13 @@ class TabletConnection(object):
def __init__( def __init__(
self, addr, tablet_type, keyspace, shard, timeout, user=None, self, addr, tablet_type, keyspace, shard, timeout, user=None,
password=None, keyfile=None, certfile=None): password=None, keyfile=None, certfile=None, caller_id=None):
self.addr = addr self.addr = addr
self.tablet_type = tablet_type self.tablet_type = tablet_type
self.keyspace = keyspace self.keyspace = keyspace
self.shard = shard self.shard = shard
self.timeout = timeout self.timeout = timeout
self.caller_id = caller_id
self.client = bsonrpc.BsonRpcClient(addr, timeout, user, password, self.client = bsonrpc.BsonRpcClient(addr, timeout, user, password,
keyfile=keyfile, certfile=certfile) keyfile=keyfile, certfile=certfile)
self.logger_object = vtdb_logger.get_logger() self.logger_object = vtdb_logger.get_logger()
@ -90,7 +91,7 @@ class TabletConnection(object):
return '<TabletConnection %s %s %s/%s>' % ( return '<TabletConnection %s %s %s/%s>' % (
self.addr, self.tablet_type, self.keyspace, self.shard) self.addr, self.tablet_type, self.keyspace, self.shard)
def dial(self, caller_id='youtube-dev-dedicated'): def dial(self):
try: try:
if self.session_id: if self.session_id:
self.client.close() self.client.close()
@ -107,7 +108,7 @@ class TabletConnection(object):
'Keyspace': self.keyspace, 'Keyspace': self.keyspace,
'Shard': self.shard 'Shard': self.shard
}, },
'ImmediateCallerID': {'Username': caller_id} 'ImmediateCallerID': {'Username': self.caller_id}
} }
response = self.rpc_call_and_extract_error( response = self.rpc_call_and_extract_error(
@ -128,11 +129,11 @@ class TabletConnection(object):
def is_closed(self): def is_closed(self):
return self.client.is_closed() return self.client.is_closed()
def begin(self, caller_id='youtube-dev-dedicated'): def begin(self):
if self.transaction_id: if self.transaction_id:
raise dbexceptions.NotSupportedError('Nested transactions not supported') raise dbexceptions.NotSupportedError('Nested transactions not supported')
req = { req = {
'ImmediateCallerID': {'Username': caller_id}, 'ImmediateCallerID': {'Username': self.caller_id},
'SessionId': self.session_id 'SessionId': self.session_id
} }
try: try:
@ -141,12 +142,12 @@ class TabletConnection(object):
except gorpc.GoRpcError as e: except gorpc.GoRpcError as e:
raise convert_exception(e, str(self)) raise convert_exception(e, str(self))
def commit(self, caller_id='youtube-dev-dedicated'): def commit(self):
if not self.transaction_id: if not self.transaction_id:
return return
req = { req = {
'ImmediateCallerID': {'Username': caller_id}, 'ImmediateCallerID': {'Username': self.caller_id},
'TransactionId': self.transaction_id, 'TransactionId': self.transaction_id,
'SessionId': self.session_id 'SessionId': self.session_id
} }
@ -164,12 +165,12 @@ class TabletConnection(object):
except gorpc.GoRpcError as e: except gorpc.GoRpcError as e:
raise convert_exception(e, str(self)) raise convert_exception(e, str(self))
def rollback(self, caller_id='youtube-dev-dedicated'): def rollback(self):
if not self.transaction_id: if not self.transaction_id:
return return
req = { req = {
'ImmediateCallerID': {'Username': caller_id}, 'ImmediateCallerID': {'Username': self.caller_id},
'TransactionId': self.transaction_id, 'TransactionId': self.transaction_id,
'SessionId': self.session_id 'SessionId': self.session_id
} }
@ -211,7 +212,7 @@ class TabletConnection(object):
raise gorpc.AppError(reply['Err']['Message'], method_name) raise gorpc.AppError(reply['Err']['Message'], method_name)
return response return response
def _execute(self, sql, bind_variables, caller_id='youtube-dev-dedicated'): def _execute(self, sql, bind_variables):
req = { req = {
'QueryRequest': { 'QueryRequest': {
'Sql': sql, 'Sql': sql,
@ -219,7 +220,7 @@ class TabletConnection(object):
'SessionId': self.session_id, 'SessionId': self.session_id,
'TransactionId': self.transaction_id 'TransactionId': self.transaction_id
}, },
'ImmediateCallerID': {'Username': caller_id} 'ImmediateCallerID': {'Username': self.caller_id}
} }
fields = [] fields = []
@ -246,7 +247,7 @@ class TabletConnection(object):
raise raise
return results, rowcount, lastrowid, fields return results, rowcount, lastrowid, fields
def _execute_batch(self, sql_list, bind_variables_list, as_transaction, caller_id='youtube-dev-dedicated'): def _execute_batch(self, sql_list, bind_variables_list, as_transaction):
query_list = [] query_list = []
for sql, bind_vars in zip(sql_list, bind_variables_list): for sql, bind_vars in zip(sql_list, bind_variables_list):
query = {} query = {}
@ -264,7 +265,7 @@ class TabletConnection(object):
'AsTransaction': as_transaction, 'AsTransaction': as_transaction,
'TransactionId': self.transaction_id 'TransactionId': self.transaction_id
}, },
'ImmediateCallerID': {'Username': caller_id} 'ImmediateCallerID': {'Username': self.caller_id}
} }
response = self.rpc_call_and_extract_error('SqlQuery.ExecuteBatch2', req) response = self.rpc_call_and_extract_error('SqlQuery.ExecuteBatch2', req)
@ -295,7 +296,7 @@ class TabletConnection(object):
# we return the fields for the response, and the column conversions # we return the fields for the response, and the column conversions
# the conversions will need to be passed back to _stream_next # the conversions will need to be passed back to _stream_next
# (that way we avoid using a member variable here for such a corner case) # (that way we avoid using a member variable here for such a corner case)
def _stream_execute(self, sql, bind_variables, caller_id='youtube-dev-dedicated'): def _stream_execute(self, sql, bind_variables):
req = { req = {
'Query': { 'Query': {
'Sql': sql, 'Sql': sql,
@ -303,46 +304,7 @@ class TabletConnection(object):
'SessionId': self.session_id, 'SessionId': self.session_id,
'TransactionId': self.transaction_id 'TransactionId': self.transaction_id
}, },
'ImmediateCallerID': {'Username': caller_id} 'ImmediateCallerID': {'Username': self.caller_id}
}
self._stream_fields = []
self._stream_conversions = []
self._stream_result = None
self._stream_result_index = 0
try:
self.client.stream_call('SqlQuery.StreamExecute2', req)
first_response = self.client.stream_next()
reply = first_response.reply
if reply.get('Err'):
self.__drain_conn_after_streaming_app_error()
raise gorpc.AppError(reply['Err'].get(
'Message', 'Missing error message'))
for field in reply['Fields']:
self._stream_fields.append((field['Name'], field['Type']))
self._stream_conversions.append(
field_types.conversions.get(field['Type']))
except gorpc.GoRpcError as e:
self.logger_object.log_private_data(bind_variables)
raise convert_exception(e, str(self), sql)
except:
logging.exception('gorpc low-level error')
raise
return None, 0, 0, self._stream_fields
# we return the fields for the response, and the column conversions
# the conversions will need to be passed back to _stream_next
# (that way we avoid using a member variable here for such a corner case)
def _stream_execute2(self, sql, bind_variables, caller_id='youtube-dev-dedicated'):
req = {
'Query': {
'Sql': sql,
'BindVariables': field_types.convert_bind_vars(bind_variables),
'SessionId': self.session_id,
'TransactionId': self.transaction_id
},
'ImmediateCallerID': {'Username': caller_id}
} }
self._stream_fields = [] self._stream_fields = []

Просмотреть файл

@ -370,7 +370,7 @@ class TestNocache(framework.TestCase):
def test_query_timeout(self): def test_query_timeout(self):
vstart = self.env.debug_vars() vstart = self.env.debug_vars()
conn = tablet_conn.connect(self.env.address, '', 'test_keyspace', '0', 5, user='youtube-dev-dedicated', password='vtpass') conn = tablet_conn.connect(self.env.address, '', 'test_keyspace', '0', 5, user='dev', password='vtpass', caller_id='dev')
cu = cursor.TabletCursor(conn) cu = cursor.TabletCursor(conn)
self.env.execute("set vt_query_timeout=0.25") self.env.execute("set vt_query_timeout=0.25")
try: try:

Просмотреть файл

@ -54,7 +54,7 @@ class TestEnv(object):
return "localhost:%s" % self.port return "localhost:%s" % self.port
def connect(self): def connect(self):
c = tablet_conn.connect(self.address, '', 'test_keyspace', '0', 2, user='youtube-dev-dedicated', password='vtpass') c = tablet_conn.connect(self.address, '', 'test_keyspace', '0', 2, user='dev', password='vtpass', caller_id='dev')
c.max_attempts = 1 c.max_attempts = 1
return c return c

Просмотреть файл

@ -351,7 +351,7 @@ class Tablet(object):
def conn(self, user=None, password=None): def conn(self, user=None, password=None):
conn = tablet.TabletConnection( conn = tablet.TabletConnection(
'localhost:%d' % self.port, self.tablet_type, self.keyspace, 'localhost:%d' % self.port, self.tablet_type, self.keyspace,
self.shard, 30) self.shard, 30, caller_id='dev')
conn.dial() conn.dial()
return conn return conn

Просмотреть файл

@ -14,7 +14,7 @@ from vtdb import tablet
class TestRPCCallAndExtract(unittest.TestCase): class TestRPCCallAndExtract(unittest.TestCase):
"""Tests rpc_call_and_extract_error is tolerant to various responses.""" """Tests rpc_call_and_extract_error is tolerant to various responses."""
tablet_conn = tablet.TabletConnection('addr', 'type', 'keyspace', 'shard', 30) tablet_conn = tablet.TabletConnection('addr', 'type', 'keyspace', 'shard', 30, caller_id='dev')
def test_reply_is_none(self): def test_reply_is_none(self):
with mock.patch.object(self.tablet_conn, 'client', autospec=True) as mock_client: with mock.patch.object(self.tablet_conn, 'client', autospec=True) as mock_client:

Просмотреть файл

@ -1,4 +1,4 @@
{ {
"ala": ["ma kota", "miala kota"], "ala": ["ma kota", "miala kota"],
"youtube-dev-dedicated": ["vtpass", "vtpasssec"] "dev": ["vtpass", "vtpasssec"]
} }

Просмотреть файл

@ -3,51 +3,51 @@
{ {
"name": "mysql", "name": "mysql",
"table_names_or_prefixes": [""], "table_names_or_prefixes": [""],
"readers": ["youtube-dev-dedicated"], "readers": ["dev"],
"writers": ["youtube-dev-dedicated"], "writers": ["dev"],
"admins": ["youtube-dev-dedicated"] "admins": ["dev"]
}, },
{ {
"name": "vtocc_cached", "name": "vtocc_cached",
"table_names_or_prefixes": ["vtocc_nocache", "vtocc_cached%"], "table_names_or_prefixes": ["vtocc_nocache", "vtocc_cached%"],
"readers": ["youtube-dev-dedicated"], "readers": ["dev"],
"writers": ["youtube-dev-dedicated"], "writers": ["dev"],
"admins": ["youtube-dev-dedicated"] "admins": ["dev"]
}, },
{ {
"name": "vtocc_renamed", "name": "vtocc_renamed",
"table_names_or_prefixes": ["vtocc_renamed%"], "table_names_or_prefixes": ["vtocc_renamed%"],
"readers": ["youtube-dev-dedicated"], "readers": ["dev"],
"writers": ["youtube-dev-dedicated"], "writers": ["dev"],
"admins": ["youtube-dev-dedicated"] "admins": ["dev"]
}, },
{ {
"name": "vtocc_part", "name": "vtocc_part",
"table_names_or_prefixes": ["vtocc_part%"], "table_names_or_prefixes": ["vtocc_part%"],
"readers": ["youtube-dev-dedicated"], "readers": ["dev"],
"writers": ["youtube-dev-dedicated"], "writers": ["dev"],
"admins": ["youtube-dev-dedicated"] "admins": ["dev"]
}, },
{ {
"name": "vtocc", "name": "vtocc",
"table_names_or_prefixes": ["vtocc_a", "vtocc_b", "vtocc_c", "dual", "vtocc_d", "vtocc_temp", "vtocc_e", "vtocc_f", "upsert_test", "vtocc_strings", "vtocc_fracts", "vtocc_ints", "vtocc_misc", "vtocc_big", "vtocc_view"], "table_names_or_prefixes": ["vtocc_a", "vtocc_b", "vtocc_c", "dual", "vtocc_d", "vtocc_temp", "vtocc_e", "vtocc_f", "upsert_test", "vtocc_strings", "vtocc_fracts", "vtocc_ints", "vtocc_misc", "vtocc_big", "vtocc_view"],
"readers": ["youtube-dev-dedicated"], "readers": ["dev"],
"writers": ["youtube-dev-dedicated"], "writers": ["dev"],
"admins": ["youtube-dev-dedicated"] "admins": ["dev"]
}, },
{ {
"name": "vtocc_test", "name": "vtocc_test",
"table_names_or_prefixes": ["vtocc_test"], "table_names_or_prefixes": ["vtocc_test"],
"readers": ["youtube-dev-dedicated"], "readers": ["dev"],
"writers": ["youtube-dev-dedicated"], "writers": ["dev"],
"admins": ["youtube-dev-dedicated"] "admins": ["dev"]
}, },
{ {
"name": "vtocc_acl_unmatched", "name": "vtocc_acl_unmatched",
"table_names_or_prefixes": ["vtocc_acl_unmatched"], "table_names_or_prefixes": ["vtocc_acl_unmatched"],
"readers": ["youtube-dev-dedicated"], "readers": ["dev"],
"writers": ["youtube-dev-dedicated"], "writers": ["dev"],
"admins": ["youtube-dev-dedicated"] "admins": ["dev"]
}, },
{ {
"name": "vtocc_acl_no_access", "name": "vtocc_acl_no_access",
@ -56,25 +56,25 @@
{ {
"name": "vtocc_acl_read_only", "name": "vtocc_acl_read_only",
"table_names_or_prefixes": ["vtocc_acl_read_only"], "table_names_or_prefixes": ["vtocc_acl_read_only"],
"readers": ["youtube-dev-dedicated"] "readers": ["dev"]
}, },
{ {
"name": "vtocc_acl_read_write", "name": "vtocc_acl_read_write",
"table_names_or_prefixes": ["vtocc_acl_read_write"], "table_names_or_prefixes": ["vtocc_acl_read_write"],
"readers": ["youtube-dev-dedicated"], "readers": ["dev"],
"writers": ["youtube-dev-dedicated"] "writers": ["dev"]
}, },
{ {
"name": "vtocc_acl_admin", "name": "vtocc_acl_admin",
"table_names_or_prefixes": ["vtocc_acl_admin"], "table_names_or_prefixes": ["vtocc_acl_admin"],
"readers": ["youtube-dev-dedicated"], "readers": ["dev"],
"writers": ["youtube-dev-dedicated"], "writers": ["dev"],
"admins": ["youtube-dev-dedicated"] "admins": ["dev"]
}, },
{ {
"name": "vtocc_acl_all_user_read_only", "name": "vtocc_acl_all_user_read_only",
"table_names_or_prefixes": ["vtocc_acl_all_user_read_only"], "table_names_or_prefixes": ["vtocc_acl_all_user_read_only"],
"readers": ["youtube-dev-dedicated"] "readers": ["dev"]
} }
] ]
} }