зеркало из https://github.com/github/vitess-gh.git
vtgate batch API: improved python client API
The previous python client API bound the batch cursor to a single keyspace_id list, which is not practical for batch commands. This change removes that binding, and instead lets you specify the keyspace and keyspace_ids for each query.
This commit is contained in:
Родитель
80e31020e2
Коммит
7d37338d12
|
@ -138,9 +138,8 @@ def create_batch_cursor_from_cursor(original_cursor, writable=False):
|
|||
raise dbexceptions.ProgrammingError(
|
||||
"Original cursor should be of VTGateCursor type.")
|
||||
batch_cursor = vtgate_cursor.BatchVTGateCursor(
|
||||
original_cursor._conn, original_cursor.keyspace,
|
||||
original_cursor._conn,
|
||||
original_cursor.tablet_type,
|
||||
keyspace_ids=original_cursor.keyspace_ids,
|
||||
writable=writable)
|
||||
return batch_cursor
|
||||
|
||||
|
@ -195,80 +194,6 @@ def db_class_method(*pargs, **kwargs):
|
|||
return classmethod(db_wrapper(*pargs, **kwargs))
|
||||
|
||||
|
||||
def execute_batch_read(cursor, query_list, bind_vars_list, as_transaction=False):
|
||||
"""Method for executing select queries in batch.
|
||||
|
||||
Args:
|
||||
cursor: original cursor - that is converted to read-only BatchVTGateCursor.
|
||||
query_list: query_list.
|
||||
bind_vars_list: bind variables list.
|
||||
|
||||
Returns:
|
||||
Result of the form [[q1row1, q1row2,...], [q2row1, ...],..]
|
||||
|
||||
Raises:
|
||||
dbexceptions.ProgrammingError when dmls are issued to read batch cursor.
|
||||
"""
|
||||
if not isinstance(cursor, vtgate_cursor.VTGateCursor):
|
||||
raise dbexceptions.ProgrammingError(
|
||||
"cursor is not of the type VTGateCursor.")
|
||||
batch_cursor = create_batch_cursor_from_cursor(cursor)
|
||||
for q, bv in zip(query_list, bind_vars_list):
|
||||
if is_dml(q):
|
||||
raise dbexceptions.ProgrammingError("Dml %s for read batch cursor." % q)
|
||||
batch_cursor.execute(q, bv)
|
||||
|
||||
batch_cursor.flush(as_transaction)
|
||||
rowsets = batch_cursor.rowsets
|
||||
result = []
|
||||
# rowset is of the type [(results, rowcount, lastrowid, fields),..]
|
||||
for rowset in rowsets:
|
||||
rowset_results = rowset[0]
|
||||
fields = [f[0] for f in rowset[3]]
|
||||
rows = []
|
||||
for row in rowset_results:
|
||||
rows.append(sql_builder.DBRow(fields, row))
|
||||
result.append(rows)
|
||||
return result
|
||||
|
||||
|
||||
def execute_batch_write(cursor, query_list, bind_vars_list, as_transaction=False):
|
||||
"""Method for executing dml queries in batch.
|
||||
|
||||
Args:
|
||||
cursor: original cursor - that is converted to read-only BatchVTGateCursor.
|
||||
query_list: query_list.
|
||||
bind_vars_list: bind variables list.
|
||||
|
||||
Returns:
|
||||
Result of the form [{'rowcount':rowcount, 'lastrowid':lastrowid}, ...]
|
||||
since for dmls those two values are valuable.
|
||||
|
||||
Raises:
|
||||
dbexceptions.ProgrammingError when non-dmls are issued to writable batch cursor.
|
||||
"""
|
||||
if not isinstance(cursor, vtgate_cursor.VTGateCursor):
|
||||
raise dbexceptions.ProgrammingError(
|
||||
"cursor is not of the type VTGateCursor.")
|
||||
batch_cursor = create_batch_cursor_from_cursor(cursor, writable=True)
|
||||
if batch_cursor.is_writable() and len(batch_cursor.keyspace_ids) != 1:
|
||||
raise dbexceptions.ProgrammingError(
|
||||
"writable batch execute can also execute on one keyspace_id.")
|
||||
for q, bv in zip(query_list, bind_vars_list):
|
||||
if not is_dml(q):
|
||||
raise dbexceptions.ProgrammingError("query %s is not a dml" % q)
|
||||
batch_cursor.execute(q, bv)
|
||||
|
||||
batch_cursor.flush(as_transaction)
|
||||
|
||||
rowsets = batch_cursor.rowsets
|
||||
result = []
|
||||
# rowset is of the type [(results, rowcount, lastrowid, fields),..]
|
||||
for rowset in rowsets:
|
||||
result.append({'rowcount':rowset[1], 'lastrowid':rowset[2]})
|
||||
return result
|
||||
|
||||
|
||||
class InvalidUtf8DbWrite(dbexceptions.Error):
|
||||
"""Raised when an attempt to write invalid utf-8 to the DB is made.
|
||||
"""
|
||||
|
|
|
@ -199,28 +199,32 @@ class BatchVTGateCursor(VTGateCursor):
|
|||
This only supports keyspace_ids right now since that is what
|
||||
the underlying vtgate server supports.
|
||||
"""
|
||||
def __init__(self, connection, keyspace, tablet_type, keyspace_ids=None,
|
||||
writable=False):
|
||||
def __init__(self, connection, tablet_type, writable=False):
|
||||
# rowset is [(results, rowcount, lastrowid, fields),]
|
||||
self.rowsets = None
|
||||
self.query_list = []
|
||||
self.bind_vars_list = []
|
||||
VTGateCursor.__init__(self, connection, keyspace, tablet_type,
|
||||
keyspace_ids=keyspace_ids, writable=writable)
|
||||
self.keyspace_list = []
|
||||
self.keyspace_ids_list = []
|
||||
VTGateCursor.__init__(self, connection, "", tablet_type, writable=writable)
|
||||
|
||||
def execute(self, sql, bind_variables=None):
|
||||
def execute(self, sql, bind_variables, keyspace, keyspace_ids):
|
||||
self.query_list.append(sql)
|
||||
self.bind_vars_list.append(bind_variables)
|
||||
self.keyspace_list.append(keyspace)
|
||||
self.keyspace_ids_list.append(keyspace_ids)
|
||||
|
||||
def flush(self, as_transaction=False):
|
||||
self.rowsets = self._conn._execute_batch(self.query_list,
|
||||
self.bind_vars_list,
|
||||
self.keyspace,
|
||||
self.keyspace_list,
|
||||
self.keyspace_ids_list,
|
||||
self.tablet_type,
|
||||
self.keyspace_ids,
|
||||
as_transaction)
|
||||
self.query_list = []
|
||||
self.bind_vars_list = []
|
||||
self.keyspace_list = []
|
||||
self.keyspace_ids_list = []
|
||||
|
||||
|
||||
class StreamVTGateCursor(VTGateCursor):
|
||||
|
|
|
@ -278,9 +278,9 @@ class VTGateConnection(object):
|
|||
|
||||
|
||||
@vtgate_utils.exponential_backoff_retry((dbexceptions.RequestBacklog))
|
||||
def _execute_batch(self, sql_list, bind_variables_list, keyspace, tablet_type, keyspace_ids, as_transaction):
|
||||
def _execute_batch(self, sql_list, bind_variables_list, keyspace_list, keyspace_ids_list, tablet_type, as_transaction):
|
||||
query_list = []
|
||||
for sql, bind_vars in zip(sql_list, bind_variables_list):
|
||||
for sql, bind_vars, keyspace, keyspace_ids in zip(sql_list, bind_variables_list, keyspace_list, keyspace_ids_list):
|
||||
sql, bind_vars = dbapi.prepare_query_bind_vars(sql, bind_vars)
|
||||
query = {}
|
||||
query['Sql'] = sql
|
||||
|
|
|
@ -390,11 +390,9 @@ class TestVTGateFunctions(unittest.TestCase):
|
|||
{'eid': x, 'id': x, 'keyspace_id': keyspace_id})
|
||||
cursor.commit()
|
||||
kid_list = [pack_kid(kid) for kid in kid_list]
|
||||
cursor = vtgate_conn.cursor(KEYSPACE_NAME, 'master',
|
||||
keyspace_ids=kid_list,
|
||||
cursorclass=vtgate_cursor.BatchVTGateCursor)
|
||||
cursor.execute("select * from vt_insert_test", {})
|
||||
cursor.execute("select * from vt_a", {})
|
||||
cursor = vtgate_conn.cursor('master', cursorclass=vtgate_cursor.BatchVTGateCursor)
|
||||
cursor.execute("select * from vt_insert_test", {}, KEYSPACE_NAME, kid_list)
|
||||
cursor.execute("select * from vt_a", {}, KEYSPACE_NAME, kid_list)
|
||||
cursor.flush()
|
||||
self.assertEqual(cursor.rowsets[0][1], count)
|
||||
self.assertEqual(cursor.rowsets[1][1], count)
|
||||
|
@ -405,26 +403,23 @@ class TestVTGateFunctions(unittest.TestCase):
|
|||
def test_batch_write(self):
|
||||
try:
|
||||
vtgate_conn = get_connection()
|
||||
count = 10
|
||||
query_list = []
|
||||
bind_vars_list = []
|
||||
query_list.append("delete from vt_insert_test")
|
||||
bind_vars_list.append({})
|
||||
cursor = vtgate_conn.cursor('master', cursorclass=vtgate_cursor.BatchVTGateCursor)
|
||||
kid_list = shard_kid_map[shard_names[self.shard_index]]
|
||||
all_ids = [pack_kid(kid) for kid in kid_list]
|
||||
count = 10
|
||||
cursor.execute("delete from vt_insert_test", None, KEYSPACE_NAME, all_ids)
|
||||
for x in xrange(count):
|
||||
keyspace_id = kid_list[x%len(kid_list)]
|
||||
query_list.append("insert into vt_insert_test (msg, keyspace_id) values (%(msg)s, %(keyspace_id)s)")
|
||||
bind_vars_list.append({'msg': 'test %s' % x, 'keyspace_id': keyspace_id})
|
||||
query_list.append("delete from vt_a")
|
||||
bind_vars_list.append({})
|
||||
cursor.execute("insert into vt_insert_test (msg, keyspace_id) values (%(msg)s, %(keyspace_id)s)",
|
||||
{'msg': 'test %s' % x, 'keyspace_id': keyspace_id},
|
||||
KEYSPACE_NAME, [pack_kid(keyspace_id)])
|
||||
cursor.execute("delete from vt_a", None, KEYSPACE_NAME, all_ids)
|
||||
for x in xrange(count):
|
||||
keyspace_id = kid_list[x%len(kid_list)]
|
||||
query_list.append("insert into vt_a (eid, id, keyspace_id) values (%(eid)s, %(id)s, %(keyspace_id)s)")
|
||||
bind_vars_list.append({'eid': x, 'id': x, 'keyspace_id': keyspace_id})
|
||||
vtgate_conn._execute_batch(
|
||||
query_list, bind_vars_list,
|
||||
KEYSPACE_NAME, 'master', keyspace_ids=[pack_kid(kid) for kid in kid_list],
|
||||
as_transaction=True)
|
||||
cursor.execute("insert into vt_a (eid, id, keyspace_id) values (%(eid)s, %(id)s, %(keyspace_id)s)",
|
||||
{'eid': x, 'id': x, 'keyspace_id': keyspace_id},
|
||||
KEYSPACE_NAME, [pack_kid(keyspace_id)])
|
||||
cursor.flush(True)
|
||||
results, rowcount, _, _ = vtgate_conn._execute(
|
||||
"select * from vt_insert_test", {},
|
||||
KEYSPACE_NAME, 'master',
|
||||
|
|
Загрузка…
Ссылка в новой задаче