зеркало из https://github.com/github/vitess-gh.git
test: Fix all pylint issues in utils.py, tabletmanager.py and update_stream.py.
This commit is contained in:
Родитель
d4bdaa1263
Коммит
00cd50edab
|
@ -270,9 +270,11 @@ class TestTabletManager(unittest.TestCase):
|
|||
tablet_62344.kill_vttablet()
|
||||
|
||||
def test_restart(self):
|
||||
"""test_restart tests that when starting a second vttablet with the same
|
||||
configuration as another one, it will kill the previous process
|
||||
and take over listening on the socket.
|
||||
"""Test restart behavior of vttablet.
|
||||
|
||||
Tests that when starting a second vttablet with the same configuration as
|
||||
another one, it will kill the previous process and take over listening on
|
||||
the socket.
|
||||
|
||||
If vttablet listens to other ports (like gRPC), this feature will
|
||||
break. We believe it is not widely used, so we're OK with this for now.
|
||||
|
@ -518,6 +520,7 @@ class TestTabletManager(unittest.TestCase):
|
|||
|
||||
def test_no_mysql_healthcheck(self):
|
||||
"""This test starts a vttablet with no mysql port, while mysql is down.
|
||||
|
||||
It makes sure vttablet will start properly and be unhealthy.
|
||||
Then we start mysql, and make sure vttablet becomes healthy.
|
||||
"""
|
||||
|
|
|
@ -1,10 +1,5 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import warnings
|
||||
# Dropping a table inexplicably produces a warning despite
|
||||
# the "IF EXISTS" clause. Squelch these warnings.
|
||||
warnings.simplefilter('ignore')
|
||||
|
||||
import logging
|
||||
import traceback
|
||||
import threading
|
||||
|
@ -170,7 +165,7 @@ class TestUpdateStream(unittest.TestCase):
|
|||
try:
|
||||
for _ in replica_conn.stream_update(start_position):
|
||||
break
|
||||
except Exception as e:
|
||||
except dbexceptions.DatabaseError as e:
|
||||
self.assertIn('update stream service is not enabled', str(e))
|
||||
replica_conn.close()
|
||||
|
||||
|
@ -202,7 +197,7 @@ class TestUpdateStream(unittest.TestCase):
|
|||
if stream_event.category == update_stream.StreamEvent.DML:
|
||||
logging.debug('Test Service Enabled: Pass')
|
||||
break
|
||||
except Exception as e:
|
||||
except dbexceptions.DatabaseError as e:
|
||||
self.fail('Exception in getting stream from replica: %s\n Traceback %s' %
|
||||
(str(e), traceback.format_exc()))
|
||||
thd.join(timeout=30)
|
||||
|
@ -254,10 +249,11 @@ class TestUpdateStream(unittest.TestCase):
|
|||
return
|
||||
|
||||
def test_stream_parity(self):
|
||||
"""test_stream_parity checks the parity of streams received
|
||||
from master and replica for the same writes. Also tests
|
||||
transactions are retrieved properly.
|
||||
"""Tests parity of streams between master and replica for the same writes.
|
||||
|
||||
Also tests transactions are retrieved properly.
|
||||
"""
|
||||
|
||||
global master_start_position
|
||||
|
||||
timeout = 30
|
||||
|
|
|
@ -9,7 +9,7 @@ import shlex
|
|||
import shutil
|
||||
import signal
|
||||
import socket
|
||||
from subprocess import Popen, CalledProcessError, PIPE
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import unittest
|
||||
|
@ -244,12 +244,12 @@ def run(cmd, trap_output=False, raise_on_error=True, **kargs):
|
|||
else:
|
||||
args = cmd
|
||||
if trap_output:
|
||||
kargs['stdout'] = PIPE
|
||||
kargs['stderr'] = PIPE
|
||||
kargs['stdout'] = subprocess.PIPE
|
||||
kargs['stderr'] = subprocess.PIPE
|
||||
logging.debug(
|
||||
'run: %s %s', str(cmd),
|
||||
', '.join('%s=%s' % x for x in kargs.iteritems()))
|
||||
proc = Popen(args, **kargs)
|
||||
proc = subprocess.Popen(args, **kargs)
|
||||
proc.args = args
|
||||
stdout, stderr = proc.communicate()
|
||||
if proc.returncode:
|
||||
|
@ -268,13 +268,13 @@ def run_fail(cmd, **kargs):
|
|||
args = shlex.split(cmd)
|
||||
else:
|
||||
args = cmd
|
||||
kargs['stdout'] = PIPE
|
||||
kargs['stderr'] = PIPE
|
||||
kargs['stdout'] = subprocess.PIPE
|
||||
kargs['stderr'] = subprocess.PIPE
|
||||
if options.verbose == 2:
|
||||
logging.debug(
|
||||
'run: (expect fail) %s %s', cmd,
|
||||
', '.join('%s=%s' % x for x in kargs.iteritems()))
|
||||
proc = Popen(args, **kargs)
|
||||
proc = subprocess.Popen(args, **kargs)
|
||||
proc.args = args
|
||||
stdout, stderr = proc.communicate()
|
||||
if proc.returncode == 0:
|
||||
|
@ -297,7 +297,7 @@ def run_bg(cmd, **kargs):
|
|||
args = shlex.split(cmd)
|
||||
else:
|
||||
args = cmd
|
||||
proc = Popen(args=args, **kargs)
|
||||
proc = subprocess.Popen(args=args, **kargs)
|
||||
proc.args = args
|
||||
_add_proc(proc)
|
||||
return proc
|
||||
|
@ -315,7 +315,8 @@ def wait_procs(proc_list, raise_on_error=True):
|
|||
if options.verbose >= 1 and proc.returncode not in (-9,):
|
||||
sys.stderr.write('proc failed: %s %s\n' % (proc.returncode, proc.args))
|
||||
if raise_on_error:
|
||||
raise CalledProcessError(proc.returncode, ' '.join(proc.args))
|
||||
raise subprocess.CalledProcessError(proc.returncode,
|
||||
' '.join(proc.args))
|
||||
|
||||
|
||||
def validate_topology(ping_tablets=False):
|
||||
|
@ -359,16 +360,13 @@ def wait_step(msg, timeout, sleep_time=1.0):
|
|||
|
||||
# vars helpers
|
||||
def get_vars(port):
|
||||
"""Returns the dict for vars from a vtxxx process.
|
||||
|
||||
Returns: None if we can't get them.
|
||||
"""
|
||||
"""Returns the dict for vars from a vtxxx process. None if not available."""
|
||||
try:
|
||||
url = 'http://localhost:%d/debug/vars' % int(port)
|
||||
f = urllib2.urlopen(url)
|
||||
data = f.read()
|
||||
f.close()
|
||||
except:
|
||||
except urllib2.URLError:
|
||||
return None
|
||||
try:
|
||||
return json.loads(data)
|
||||
|
@ -470,8 +468,13 @@ def wait_for_tablet_type(tablet_alias, expected_type, timeout=10):
|
|||
def wait_for_replication_pos(tablet_a, tablet_b, timeout=60.0):
|
||||
"""Waits for tablet B to catch up to the replication position of tablet A.
|
||||
|
||||
If the replication position does not catch up within timeout seconds, it will
|
||||
raise a TestError.
|
||||
Args:
|
||||
tablet_a: tablet Object for tablet A.
|
||||
tablet_b: tablet Object for tablet B.
|
||||
timeout: Timeout in seconds.
|
||||
|
||||
Raises:
|
||||
TestError: replication position did not catch up within timeout seconds.
|
||||
"""
|
||||
replication_pos_a = mysql_flavor().master_position(tablet_a)
|
||||
while True:
|
||||
|
@ -494,8 +497,7 @@ class VtGate(object):
|
|||
"""VtGate object represents a vtgate process."""
|
||||
|
||||
def __init__(self, port=None):
|
||||
"""Creates the Vtgate instance and reserve the ports if necessary.
|
||||
"""
|
||||
"""Creates the Vtgate instance and reserve the ports if necessary."""
|
||||
self.port = port or environment.reserve_ports(1)
|
||||
if protocols_flavor().vtgate_protocol() == 'grpc':
|
||||
self.grpc_port = environment.reserve_ports(1)
|
||||
|
@ -506,11 +508,8 @@ class VtGate(object):
|
|||
topo_impl=None, cache_ttl='1s',
|
||||
timeout_total='4s', timeout_per_conn='2s',
|
||||
extra_args=None):
|
||||
"""Starts the process for this vtgate instance.
|
||||
"""Start vtgate. Saves it into the global vtgate variable if not set yet."""
|
||||
|
||||
If no other instance has been started, saves it into the global
|
||||
vtgate variable.
|
||||
"""
|
||||
args = environment.binary_args('vtgate') + [
|
||||
'-port', str(self.port),
|
||||
'-cell', cell,
|
||||
|
@ -725,13 +724,7 @@ def run_vtworker(clargs, auto_log=False, expect_fail=False, **kwargs):
|
|||
|
||||
|
||||
def run_vtworker_bg(clargs, auto_log=False, **kwargs):
|
||||
"""Starts a background vtworker process.
|
||||
|
||||
Returns:
|
||||
proc - process returned by subprocess.Popen
|
||||
port - int with the port number that the vtworker is running with
|
||||
rpc_port - int with the port number of the RPC interface
|
||||
"""
|
||||
"""Starts a background vtworker process."""
|
||||
cmd, port, rpc_port = _get_vtworker_cmd(clargs, auto_log)
|
||||
return run_bg(cmd, **kwargs), port, rpc_port
|
||||
|
||||
|
@ -739,6 +732,10 @@ def run_vtworker_bg(clargs, auto_log=False, **kwargs):
|
|||
def _get_vtworker_cmd(clargs, auto_log=False):
|
||||
"""Assembles the command that is needed to run a vtworker.
|
||||
|
||||
Args:
|
||||
clargs: Command line arguments passed to vtworker.
|
||||
auto_log: If true, set --stderrthreshold according to the test log level.
|
||||
|
||||
Returns:
|
||||
cmd - list of cmd arguments, can be passed to any `run`-like functions
|
||||
port - int with the port number that the vtworker is running with
|
||||
|
@ -950,14 +947,7 @@ def check_shard_query_services(
|
|||
|
||||
def check_tablet_query_service(
|
||||
testcase, tablet, serving, tablet_control_disabled):
|
||||
"""Check that the query service is enabled or disabled on the tablet.
|
||||
|
||||
It will also check if the tablet control status is the reason for
|
||||
being enabled / disabled.
|
||||
|
||||
It will also run a remote RunHealthCheck to be sure it doesn't change
|
||||
the serving state.
|
||||
"""
|
||||
"""Check that the query service is enabled or disabled on the tablet."""
|
||||
tablet_vars = get_vars(tablet.port)
|
||||
if serving:
|
||||
expected_state = 'SERVING'
|
||||
|
@ -977,6 +967,7 @@ def check_tablet_query_service(
|
|||
testcase.assertNotIn('Query Service disabled by TabletControl', status)
|
||||
|
||||
if tablet.tablet_type == 'rdonly':
|
||||
# Run RunHealthCheck to be sure the tablet doesn't change its serving state.
|
||||
run_vtctl(['RunHealthCheck', tablet.tablet_alias, 'rdonly'],
|
||||
auto_log=True)
|
||||
|
||||
|
@ -1116,7 +1107,7 @@ class Vtctld(object):
|
|||
if protocol == 'grpc':
|
||||
# import the grpc vtctl client implementation, change the port
|
||||
if python:
|
||||
from vtctl import grpc_vtctl_client
|
||||
from vtctl import grpc_vtctl_client # pylint: disable=g-import-not-at-top,unused-variable
|
||||
rpc_port = self.grpc_port
|
||||
return (protocol, '%s:%d' % (socket.getfqdn(), rpc_port))
|
||||
|
||||
|
@ -1155,6 +1146,9 @@ def uint64_to_hex(integer):
|
|||
Args:
|
||||
integer: the value to print.
|
||||
|
||||
Returns:
|
||||
String with the hex representation.
|
||||
|
||||
Raises:
|
||||
ValueError: if the integer is out of range.
|
||||
"""
|
||||
|
|
Загрузка…
Ссылка в новой задаче