Making zkocc process management and logging better in tests.

LGTM Mike.
This commit is contained in:
Alain Jobart 2012-12-06 11:31:48 -08:00
Родитель bf7e1ccd3c
Коммит 882598740f
6 изменённых файлов: 96 добавлений и 54 удалений

Просмотреть файл

@ -115,15 +115,15 @@ def run_test_complex_schema():
shard_1_replica1.start_vttablet()
# make sure all replication is good
utils.run_vtctl('ReparentShard -force /zk/global/vt/keyspaces/test_keyspace/shards/0 ' + shard_0_master.zk_tablet_path)
utils.run_vtctl('ReparentShard -force /zk/global/vt/keyspaces/test_keyspace/shards/1 ' + shard_1_master.zk_tablet_path)
utils.run_vtctl('ReparentShard -force /zk/global/vt/keyspaces/test_keyspace/shards/0 ' + shard_0_master.zk_tablet_path, auto_log=True)
utils.run_vtctl('ReparentShard -force /zk/global/vt/keyspaces/test_keyspace/shards/1 ' + shard_1_master.zk_tablet_path, auto_log=True)
# shard 0: apply the schema using a complex schema upgrade, no
# reparenting yet
utils.run_vtctl(['ApplySchemaShard',
'-sql='+create_vt_select_test[0],
'/zk/global/vt/keyspaces/test_keyspace/shards/0'],
log_level='INFO')
auto_log=True)
# check all expected hosts have the change:
# - master won't have it as it's a complex change
@ -142,7 +142,7 @@ def run_test_complex_schema():
'-stop-replication',
'-sql='+create_vt_select_test[0],
shard_0_master.zk_tablet_path],
log_level='INFO')
auto_log=True)
check_tables(shard_0_master, 1)
# shard 0: apply schema change to just backup directly
@ -151,7 +151,7 @@ def run_test_complex_schema():
'-stop-replication',
'-sql='+create_vt_select_test[0],
shard_0_backup.zk_tablet_path],
log_level='INFO')
auto_log=True)
check_tables(shard_0_backup, 1)
# shard 0: apply new schema change, with reparenting
@ -159,7 +159,7 @@ def run_test_complex_schema():
'-new-parent='+shard_0_replica1.zk_tablet_path,
'-sql='+create_vt_select_test[1],
'/zk/global/vt/keyspaces/test_keyspace/shards/0'],
log_level='INFO')
auto_log=True)
check_tables(shard_0_master, 1)
check_tables(shard_0_replica1, 2)
check_tables(shard_0_replica2, 2)
@ -183,12 +183,12 @@ def run_test_complex_schema():
'-simple',
'-sql='+create_vt_select_test[0],
'/zk/global/vt/keyspaces/test_keyspace/shards/1'],
log_level='INFO')
auto_log=True)
utils.run_vtctl(['ApplySchemaShard',
'-simple',
'-sql='+create_vt_select_test[1],
'/zk/global/vt/keyspaces/test_keyspace/shards/1'],
log_level='INFO')
auto_log=True)
check_tables(shard_1_master, 2)
check_tables(shard_1_replica1, 2)
@ -197,7 +197,7 @@ def run_test_complex_schema():
'-simple',
'-sql='+create_vt_select_test[2],
'/zk/global/vt/keyspaces/test_keyspace'],
log_level='INFO')
auto_log=True)
# check all expected hosts have the change
check_tables(shard_0_master, 1) # was stuck a long time ago as scrap
@ -212,7 +212,7 @@ def run_test_complex_schema():
utils.run_vtctl(['ApplySchemaKeyspace',
'-sql='+create_vt_select_test[3],
'/zk/global/vt/keyspaces/test_keyspace'],
log_level='INFO')
auto_log=True)
# check all expected hosts have the change:
# - master won't have it as it's a complex change
@ -234,7 +234,7 @@ def run_test_complex_schema():
if oldCount <= 5:
raise utils.TestError('Not enough actionlog before: %u' % oldCount)
utils.run_vtctl('PruneActionLogs -keep-count=5 '+shard_0_replica1.zk_tablet_path+'/actionlog', log_level='INFO')
utils.run_vtctl('PruneActionLogs -keep-count=5 '+shard_0_replica1.zk_tablet_path+'/actionlog', auto_log=True)
out, err = utils.run(vtroot+'/bin/zk ls '+shard_0_replica1.zk_tablet_path+'/actionlog', trap_output=True)
newLines = out.splitlines()

Просмотреть файл

@ -19,7 +19,6 @@ shard_1_master = tablet.Tablet()
shard_1_replica = tablet.Tablet()
def setup():
utils.prog_compile(['zkocc'])
utils.zk_setup()
setup_procs = [
@ -132,10 +131,10 @@ def run_test_sharding():
shard_0_replica.zk_tablet_path])
# start zkocc, we'll use it later
zkocc = utils.run_bg(vtroot+'/bin/zkocc -port=14850 test_nj')
zkocc = utils.zkocc_start()
utils.run_vtctl('ReparentShard -force /zk/global/vt/keyspaces/test_keyspace/shards/0000000000000000-8000000000000000 ' + shard_0_master.zk_tablet_path)
utils.run_vtctl('ReparentShard -force /zk/global/vt/keyspaces/test_keyspace/shards/8000000000000000-0000000000000000 ' + shard_1_master.zk_tablet_path)
utils.run_vtctl('ReparentShard -force /zk/global/vt/keyspaces/test_keyspace/shards/0000000000000000-8000000000000000 ' + shard_0_master.zk_tablet_path, auto_log=True)
utils.run_vtctl('ReparentShard -force /zk/global/vt/keyspaces/test_keyspace/shards/8000000000000000-0000000000000000 ' + shard_1_master.zk_tablet_path, auto_log=True)
# apply the schema on the second shard using a simple schema upgrade
utils.run_vtctl(['ApplySchemaShard',
@ -161,10 +160,7 @@ def run_test_sharding():
"10\ttest 10"])
# write a value, re-read them all
out, err = utils.vttablet_query(3803, "/zk/test_nj/vt/ns/test_keyspace/master", "insert into vt_select_test (id, msg) values (2, 'test 2')", driver="vtdb", verbose=True)
print out
print err
utils.vttablet_query(3803, "/zk/test_nj/vt/ns/test_keyspace/master", "insert into vt_select_test (id, msg) values (2, 'test 2')", driver="vtdb", verbose=True)
check_rows(["Index\tid\tmsg",
"1\ttest 1",
"2\ttest 2",

Просмотреть файл

@ -1,9 +1,8 @@
import os
import json
import os
import shutil
import sys
import time
import urllib
import warnings
# Dropping a table inexplicably produces a warning despite
# the "IF EXISTS" clause. Squelch these warnings.
@ -259,22 +258,9 @@ class Tablet(object):
return self.proc
def get_vttablet_vars(self, port=None):
"""
Returns the dict for vars, from the vttablet process, or None
if we can't get them.
"""
try:
f = urllib.urlopen('http://localhost:%u/debug/vars' % (port or self.port))
data = f.read()
f.close()
except:
return None
return json.loads(data)
def wait_for_vttablet_state(self, expected, timeout=5.0, port=None):
while True:
v = self.get_vttablet_vars(port)
v = utils.get_vars(port or self.port)
if v == None:
utils.debug(" vttablet not answering at /debug/vars, waiting...")
else:
@ -283,7 +269,7 @@ class Tablet(object):
else:
s = v['Voltron']['State']
if s != expected:
utils.debug(" vttablet in state %u != %u" % (s, expected))
utils.debug(" vttablet in state %s != %s" % (s, expected))
else:
break
@ -291,7 +277,7 @@ class Tablet(object):
time.sleep(0.1)
timeout -= 0.1
if timeout <= 0:
raise utils.TestError("timeout waiting for state %u" % expected)
raise utils.TestError("timeout waiting for state %s" % expected)
def _write_db_configs_file(self):
config = dict(self.default_db_config)

Просмотреть файл

@ -479,7 +479,7 @@ def run_test_reparent_down_master():
utils.run_fail(vtroot+'/bin/vtctl -logfile=/dev/null -log.level=WARNING -wait-time 5s ScrapTablet ' + tablet_62344.zk_tablet_alias)
# Force the scrap action in zk even though tablet is not accessible.
utils.run_vtctl('ScrapTablet -force -skip-rebuild ' + tablet_62344.zk_tablet_path)
utils.run_vtctl('ScrapTablet -force -skip-rebuild ' + tablet_62344.zk_tablet_path, auto_log=True)
utils.run_fail(vtroot+'/bin/vtctl -logfile=/dev/null -log.level=WARNING ChangeSlaveType -force %s idle' %
tablet_62344.zk_tablet_path)

Просмотреть файл

@ -9,7 +9,7 @@ import socket
from subprocess import Popen, CalledProcessError, PIPE
import sys
import time
import types
import urllib
import MySQLdb
@ -90,7 +90,7 @@ def kill_sub_process(proc):
# run in foreground, possibly capturing output
def run(cmd, trap_output=False, raise_on_error=True, **kargs):
if isinstance(cmd, types.StringTypes):
if isinstance(cmd, str):
args = shlex.split(cmd)
else:
args = cmd
@ -112,7 +112,10 @@ def run(cmd, trap_output=False, raise_on_error=True, **kargs):
# run sub-process, expects failure
def run_fail(cmd, **kargs):
args = shlex.split(cmd)
if isinstance(cmd, str):
args = shlex.split(cmd)
else:
args = cmd
kargs['stdout'] = PIPE
kargs['stderr'] = PIPE
if options.verbose:
@ -128,7 +131,10 @@ def run_fail(cmd, **kargs):
def run_bg(cmd, **kargs):
if options.verbose:
print "run:", cmd, ', '.join('%s=%s' % x for x in kargs.iteritems())
args = shlex.split(cmd)
if isinstance(cmd, str):
args = shlex.split(cmd)
else:
args = cmd
proc = Popen(args=args, **kargs)
proc.args = args
_add_proc(proc)
@ -199,13 +205,64 @@ def zk_check(ping_tablets=False):
else:
run_vtctl('Validate /zk/global/vt/keyspaces')
# vars helpers
def get_vars(port):
"""
Returns the dict for vars, from a vtxxx process, or None
if we can't get them.
"""
try:
f = urllib.urlopen('http://localhost:%u/debug/vars' % port)
data = f.read()
f.close()
except:
return None
return json.loads(data)
# zkocc helpers
def zkocc_start(port=14850, cells=['test_nj'], extra_params=[]):
prog_compile(['zkocc'])
logfile = tmp_root + '/zkocc_%u.log' % port
args = [vtroot+'/bin/zkocc',
'-port', str(port),
'-logfile', logfile,
'-log.level', 'INFO',
] + extra_params + cells
sp = run_bg(args)
# wait for vars
timeout = 5.0
while True:
v = get_vars(port)
if v == None:
debug(" zkocc not answering at /debug/vars, waiting...")
else:
break
debug("sleeping a bit while we wait")
time.sleep(0.1)
timeout -= 0.1
if timeout <= 0:
raise TestError("timeout waiting for zkocc")
return sp
def zkocc_kill(sp):
kill_sub_process(sp)
sp.wait()
# vtctl helpers
def run_vtctl(clargs, log_level='WARNING', **kwargs):
def run_vtctl(clargs, log_level='WARNING', auto_log=False, **kwargs):
if auto_log:
if options.verbose:
log_level='INFO'
else:
log_level='ERROR'
prog_compile(['vtctl'])
args = [vtroot+'/bin/vtctl',
'-log.level='+log_level,
'-logfile=/dev/null']
if isinstance(clargs, types.StringTypes):
if isinstance(clargs, str):
cmd = " ".join(args) + ' ' + clargs
else:
cmd = args + clargs

Просмотреть файл

@ -1,6 +1,7 @@
#!/usr/bin/python
from optparse import OptionParser
import logging
import os
import socket
import tempfile
@ -16,8 +17,7 @@ vtroot = os.environ['VTROOT']
hostname = socket.gethostname()
def setup():
utils.prog_compile(['zkocc',
'zkclient2',
utils.prog_compile(['zkclient2',
])
utils.zk_setup()
@ -62,8 +62,7 @@ def _check_zk_output(cmd, expected):
if out != expected:
raise utils.TestError('unexpected zk zkocc output: ', cmd, "is", out, "but expected", expected)
if utils.options.verbose:
print "Matched:", out
utils.debug("Matched: " + out)
def _format_time(timeFromBson):
(tz, val) = timeFromBson
@ -75,7 +74,7 @@ def run_test_zkocc():
_populate_zk()
# preload the test_nj cell
zkocc_14850 = utils.run_bg(vtroot+'/bin/zkocc -port=14850 -connect-timeout=2s -cache-refresh-interval=1s test_nj')
zkocc_14850 = utils.zkocc_start(extra_params=['-connect-timeout=2s', '-cache-refresh-interval=1s'])
time.sleep(1)
# create a python client. The first address is bad, will test the retry logic
@ -85,12 +84,14 @@ def run_test_zkocc():
# test failure for a python client that cannot connect
bad_zkocc_client = zkocc.ZkOccConnection("localhost:14848,localhost:14849", 30)
bad_zkocc_client.dial()
logging.getLogger().setLevel(logging.ERROR)
try:
bad_zkocc_client.get("/zk/test_nj/zkocc1/data1")
raise utils.TestError('exception expected')
except zkocc.ZkOccError as e:
if str(e) != "zkocc get command failed 2 times: ('get failed', GoRpcError(error(111, 'Connection refused'), 'ZkReader.Get'))":
raise utils.TestError('Unexpected exception: ', str(e))
logging.getLogger().setLevel(logging.WARNING)
# get test
out, err = utils.run(vtroot+'/bin/zkclient2 -server localhost:14850 /zk/test_nj/zkocc1/data1', trap_output=True)
@ -167,7 +168,7 @@ Stale = false
utils.kill_sub_process(querier)
print "Checking", filename
utils.debug("Checking " + filename)
fd = open(filename, "r")
state = 0
for line in fd:
@ -190,27 +191,29 @@ Stale = false
raise utils.TestError('unexpected ended stale state')
fd.close()
utils.kill_sub_process(zkocc_14850)
zkocc_14850.wait()
utils.zkocc_kill(zkocc_14850)
# check that after the server is gone, the python client fails correctly
logging.getLogger().setLevel(logging.ERROR)
try:
zkocc_client.get("/zk/test_nj/zkocc1/data1")
raise utils.TestError('exception expected')
except zkocc.ZkOccError as e:
if str(e) != "zkocc get command failed 2 times: ('get failed', GoRpcError(error(111, 'Connection refused'), 'ZkReader.Get'))":
raise utils.TestError('Unexpected exception: ', str(e))
logging.getLogger().setLevel(logging.WARNING)
@utils.test_case
def run_test_zkocc_qps():
_populate_zk()
# preload the test_nj cell
zkocc_14850 = utils.run_bg(vtroot+'/bin/zkocc -port=14850 test_nj')
zkocc_14850 = utils.zkocc_start()
qpser = utils.run_bg(vtroot+'/bin/zkclient2 -server localhost:14850 -mode qps /zk/test_nj/zkocc1/data1 /zk/test_nj/zkocc1/data2')
time.sleep(10)
utils.kill_sub_process(qpser)
utils.kill_sub_process(zkocc_14850)
utils.zkocc_kill(zkocc_14850)
def run_all():
run_test_zkocc()