Making vertical_split.py always and only work with vtgate.

This commit is contained in:
Alain Jobart 2015-08-18 08:09:56 -07:00
Родитель 3428cd2e61
Коммит 81571296b8
4 изменённых файлов: 40 добавлений и 144 удалений

Просмотреть файл

@ -97,7 +97,6 @@ small_integration_test_files = \
tablet_test.py \ tablet_test.py \
sql_builder_test.py \ sql_builder_test.py \
vertical_split.py \ vertical_split.py \
vertical_split_vtgate.py \
schema.py \ schema.py \
keyspace_test.py \ keyspace_test.py \
keyrange_test.py \ keyrange_test.py \
@ -138,7 +137,6 @@ worker_integration_test_files = \
resharding.py \ resharding.py \
resharding_bytes.py \ resharding_bytes.py \
vertical_split.py \ vertical_split.py \
vertical_split_vtgate.py \
initial_sharding.py \ initial_sharding.py \
initial_sharding_bytes.py \ initial_sharding_bytes.py \
worker.py worker.py

Просмотреть файл

@ -17,9 +17,6 @@
"vertical_split": { "vertical_split": {
"File": "vertical_split.py" "File": "vertical_split.py"
}, },
"vertical_split_vtgate": {
"File": "vertical_split_vtgate.py"
},
"schema": { "schema": {
"File": "schema.py" "File": "schema.py"
}, },

Просмотреть файл

@ -5,24 +5,17 @@
# be found in the LICENSE file. # be found in the LICENSE file.
import logging import logging
import threading
import struct
import time import time
import unittest import unittest
from zk import zkocc from vtdb import keyrange
from vtdb import dbexceptions from vtdb import keyrange_constants
from vtdb import topology from vtdb import vtgate_client
from vtdb import vtclient
import environment import environment
import utils import utils
import tablet import tablet
from protocols_flavor import protocols_flavor
TABLET = 'tablet'
VTGATE = 'vtgate'
VTGATE_PROTOCOL_TABLET = 'v0'
client_type = TABLET
# source keyspace, with 4 tables # source keyspace, with 4 tables
source_master = tablet.Tablet() source_master = tablet.Tablet()
@ -51,6 +44,7 @@ def setUpModule():
destination_rdonly2.init_mysql(), destination_rdonly2.init_mysql(),
] ]
utils.Vtctld().start() utils.Vtctld().start()
utils.VtGate().start(cache_ttl='0s')
utils.wait_procs(setup_procs) utils.wait_procs(setup_procs)
except: except:
tearDownModule() tearDownModule()
@ -61,6 +55,7 @@ def tearDownModule():
if utils.options.skip_teardown: if utils.options.skip_teardown:
return return
utils.vtgate.kill()
teardown_procs = [ teardown_procs = [
source_master.teardown_mysql(), source_master.teardown_mysql(),
source_replica.teardown_mysql(), source_replica.teardown_mysql(),
@ -88,21 +83,7 @@ def tearDownModule():
class TestVerticalSplit(unittest.TestCase): class TestVerticalSplit(unittest.TestCase):
def setUp(self): def setUp(self):
utils.VtGate().start(cache_ttl='0s')
self.vtgate_client = zkocc.ZkOccConnection(utils.vtgate.addr(),
'test_nj', 30.0)
self.vtgate_addrs = None
if client_type == VTGATE:
self.vtgate_addrs = {'vt': [utils.vtgate.addr(),]}
self.insert_index = 0 self.insert_index = 0
# Lowering the keyspace refresh throttle so things are testable.
self.throttle_sleep_interval = 0.1
topology.set_keyspace_fetch_throttle(0.01)
def tearDown(self):
self.vtgate_client.close()
utils.vtgate.kill()
def _create_source_schema(self): def _create_source_schema(self):
create_table_template = '''create table %s( create_table_template = '''create table %s(
@ -123,31 +104,23 @@ index by_msg (msg)
'source_keyspace'], 'source_keyspace'],
auto_log=True) auto_log=True)
def _vtdb_conn(self, db_type='master', keyspace='source_keyspace'): def _vtdb_conn(self):
global client_type addr = utils.vtgate.rpc_endpoint()
vtgate_protocol = None protocol = protocols_flavor().vtgate_python_protocol()
if self.vtgate_addrs is None: return vtgate_client.connect(protocol, addr, 30.0)
self.vtgate_addrs = {}
if client_type == TABLET:
vtgate_protocol = VTGATE_PROTOCOL_TABLET
conn = vtclient.VtOCCConnection(self.vtgate_client, keyspace, '0',
db_type, 30,
vtgate_protocol=vtgate_protocol,
vtgate_addrs=self.vtgate_addrs)
conn.connect()
return conn
# insert some values in the source master db, return the first id used # insert some values in the source master db, return the first id used
def _insert_values(self, table, count): def _insert_values(self, table, count):
result = self.insert_index result = self.insert_index
conn = self._vtdb_conn()
cursor = conn.cursor('source_keyspace', 'master', keyranges=[keyrange.KeyRange(keyrange_constants.NON_PARTIAL_KEYRANGE)], writable=True)
for i in xrange(count): for i in xrange(count):
source_master.execute('insert into %s (id, msg) values(:id, :msg)' % conn.begin()
table, cursor.execute("insert into %s (id, msg) values(%d, 'value %d')" % (
bindvars={ table, self.insert_index, self.insert_index), {})
'id': self.insert_index, conn.commit()
'msg': 'value %d' % self.insert_index,
})
self.insert_index += 1 self.insert_index += 1
conn.close()
return result return result
def _check_values(self, tablet, dbname, table, first, count): def _check_values(self, tablet, dbname, table, first, count):
@ -222,31 +195,31 @@ index by_msg (msg)
logging.debug('Got %s rows from table %s on tablet %s', logging.debug('Got %s rows from table %s on tablet %s',
qr['Rows'][0][0], t, tablet.tablet_alias) qr['Rows'][0][0], t, tablet.tablet_alias)
def _populate_topo_cache(self): def _check_client_conn_redirection(self, source_ks, destination_ks, db_types, servedfrom_db_types, moved_tables=None):
topology.read_topology(self.vtgate_client) # check that the ServedFrom indirection worked correctly.
if moved_tables is None:
def refresh_keyspace(self, keyspace_name): moved_tables = []
# This is so that keyspace can be refreshed. conn = self._vtdb_conn()
time.sleep(self.throttle_sleep_interval)
topology.refresh_keyspace(self.vtgate_client, keyspace_name)
def _check_client_conn_redirection(self, source_ks, destination_ks, db_types,
servedfrom_db_types, moved_tables=None):
# In normal operations, it takes the first error for keyspace to be re-read.
# For testing purposes, refreshing the topology manually.
self.refresh_keyspace(destination_ks)
for db_type in servedfrom_db_types: for db_type in servedfrom_db_types:
conn = self._vtdb_conn(db_type, keyspace=destination_ks) for tbl in moved_tables:
self.assertEqual(conn.db_params['keyspace'], source_ks) try:
rows = conn._execute("select * from %s" % tbl, {}, destination_ks, db_type, keyranges=[keyrange.KeyRange(keyrange_constants.NON_PARTIAL_KEYRANGE)])
# check that the connection to db_type for destination keyspace works too. logging.debug("Select on %s.%s returned %d rows" % (db_type, tbl, len(rows)))
for db_type in db_types: except Exception, e:
dest_conn = self._vtdb_conn(db_type, keyspace=destination_ks) self.fail("Execute failed w/ exception %s" % str(e))
self.assertEqual(dest_conn.db_params['keyspace'], destination_ks)
def _check_stats(self): def _check_stats(self):
pass v = utils.vtgate.get_vars()
self.assertEqual(v['VttabletCall']['Histograms']['Execute.source_keyspace.0.replica']['Count'], 2, "unexpected value for VttabletCall(Execute.source_keyspace.0.replica) inside %s" % str(v))
self.assertEqual(v['VtgateApi']['Histograms']['ExecuteKeyRanges.destination_keyspace.master']['Count'], 6, "unexpected value for VtgateApi(ExecuteKeyRanges.destination_keyspace.master) inside %s" % str(v))
self.assertEqual(len(v['VtgateApiErrorCounts']), 0, "unexpected errors for VtgateApiErrorCounts inside %s" % str(v))
self.assertEqual(
v['ResilientSrvTopoServerEndPointsReturnedCount']['test_nj.source_keyspace.0.master'] /
v['ResilientSrvTopoServerEndPointQueryCount']['test_nj.source_keyspace.0.master'],
1, "unexpected EndPointsReturnedCount inside %s" % str(v))
self.assertNotIn(
'test_nj.source_keyspace.0.master', v['ResilientSrvTopoServerEndPointDegradedResultCount'],
"unexpected EndPointDegradedResultCount inside %s" % str(v))
def test_vertical_split(self): def test_vertical_split(self):
utils.run_vtctl(['CreateKeyspace', 'source_keyspace']) utils.run_vtctl(['CreateKeyspace', 'source_keyspace'])
@ -301,9 +274,6 @@ index by_msg (msg)
utils.run_vtctl(['InitShardMaster', 'destination_keyspace/0', utils.run_vtctl(['InitShardMaster', 'destination_keyspace/0',
destination_master.tablet_alias], auto_log=True) destination_master.tablet_alias], auto_log=True)
# read all the keyspaces, this will populate the topology cache.
self._populate_topo_cache()
# create the schema on the source keyspace, add some values # create the schema on the source keyspace, add some values
self._create_source_schema() self._create_source_schema()
moving1_first = self._insert_values('moving1', 100) moving1_first = self._insert_values('moving1', 100)
@ -343,8 +313,6 @@ index by_msg (msg)
utils.run_vtctl(['ChangeSlaveType', source_rdonly2.tablet_alias, utils.run_vtctl(['ChangeSlaveType', source_rdonly2.tablet_alias,
'rdonly'], auto_log=True) 'rdonly'], auto_log=True)
topology.refresh_keyspace(self.vtgate_client, 'destination_keyspace')
# check values are present # check values are present
self._check_values(destination_master, 'vt_destination_keyspace', 'moving1', self._check_values(destination_master, 'vt_destination_keyspace', 'moving1',
moving1_first, 100) moving1_first, 100)
@ -522,7 +490,7 @@ index by_msg (msg)
# check the binlog player is gone now # check the binlog player is gone now
destination_master.wait_for_binlog_player_count(0) destination_master.wait_for_binlog_player_count(0)
# optional method to check the stats are correct # check the stats are correct
self._check_stats() self._check_stats()
# kill everything # kill everything

Просмотреть файл

@ -1,67 +0,0 @@
#!/usr/bin/env python
#
# Copyright 2013, Google Inc. All rights reserved.
# Use of this source code is governed by a BSD-style license that can
# be found in the LICENSE file.
import logging
import vertical_split
import utils
from vtdb import keyrange
from vtdb import keyrange_constants
from vtdb import vtgatev2
def setUpModule():
vertical_split.setUpModule()
def tearDownModule():
vertical_split.tearDownModule()
class TestVerticalSplitVTGate(vertical_split.TestVerticalSplit):
def _vtdb_conn(self):
conn = vtgatev2.connect(self.vtgate_addrs['vt'], 30)
return conn
def _insert_values(self, table, count, db_type='master', keyspace='source_keyspace'):
result = self.insert_index
conn = self._vtdb_conn()
cursor = conn.cursor(keyspace, db_type, keyranges=[keyrange.KeyRange(keyrange_constants.NON_PARTIAL_KEYRANGE)], writable=True)
for i in xrange(count):
conn.begin()
cursor.execute("insert into %s (id, msg) values(%d, 'value %d')" % (
table, self.insert_index, self.insert_index), {})
conn.commit()
self.insert_index += 1
conn.close()
return result
def _check_client_conn_redirection(self, source_ks, destination_ks, db_types, servedfrom_db_types, moved_tables=None):
# check that the ServedFrom indirection worked correctly.
if moved_tables is None:
moved_tables = []
conn = self._vtdb_conn()
for db_type in servedfrom_db_types:
for tbl in moved_tables:
try:
rows = conn._execute("select * from %s" % tbl, {}, destination_ks, db_type, keyranges=[keyrange.KeyRange(keyrange_constants.NON_PARTIAL_KEYRANGE)])
logging.debug("Select on %s.%s returned %d rows" % (db_type, tbl, len(rows)))
except Exception, e:
self.fail("Execute failed w/ exception %s" % str(e))
def _check_stats(self):
v = utils.vtgate.get_vars()
self.assertEqual(v['VttabletCall']['Histograms']['Execute.source_keyspace.0.replica']['Count'], 2, "unexpected value for VttabletCall(Execute.source_keyspace.0.replica) inside %s" % str(v))
self.assertEqual(v['VtgateApi']['Histograms']['ExecuteKeyRanges.destination_keyspace.master']['Count'], 6, "unexpected value for VtgateApi(ExecuteKeyRanges.destination_keyspace.master) inside %s" % str(v))
self.assertEqual(len(v['VtgateApiErrorCounts']), 0, "unexpected errors for VtgateApiErrorCounts inside %s" % str(v))
self.assertEqual(
v['ResilientSrvTopoServerEndPointsReturnedCount']['test_nj.source_keyspace.0.master'] /
v['ResilientSrvTopoServerEndPointQueryCount']['test_nj.source_keyspace.0.master'],
1, "unexpected EndPointsReturnedCount inside %s" % str(v))
self.assertNotIn(
'test_nj.source_keyspace.0.master', v['ResilientSrvTopoServerEndPointDegradedResultCount'],
"unexpected EndPointDegradedResultCount inside %s" % str(v))
if __name__ == '__main__':
vertical_split.client_type = vertical_split.VTGATE
utils.main()