Refactor connection tests to start their own MySQL, test for timeouts.

This commit is contained in:
Ric Szopa 2012-11-06 13:48:01 -08:00
Родитель 1bea87ed04
Коммит 94e74787bf
3 изменённых файлов: 217 добавлений и 122 удалений

Просмотреть файл

@ -56,6 +56,7 @@ integration_test:
cd test ; ./sharded.py $$VT_TEST_FLAGS
cd test ; ./tabletmanager.py $$VT_TEST_FLAGS
cd test ; ./zkocc.py $$VT_TEST_FLAGS
cd test ; ./connection_test.py
clean:
cd go/cmd/mysqlctl; go clean

Просмотреть файл

@ -1,122 +0,0 @@
# coding: utf-8
import hmac
import json
import optparse
import os
import subprocess
import time
import unittest
from vtdb import vt_occ2 as db
from vtdb import dbexceptions
from vtdb import tablet2
from net import gorpc
LOGFILE = "/tmp/vtocc.log"
QUERYLOGFILE = "/tmp/vtocc_queries.log"
# This is a VtOCCConnection that doesn't attempt to do authentication.
class BareOCCConnection(db.VtOCCConnection):
def dial(self):
tablet2.TabletConnection.dial(self)
@property
def uri(self):
return 'http://%s/_bson_rpc_/auth' % self.addr
class TestAuthentication(unittest.TestCase):
vtroot = os.getenv("VTROOT")
def setUp(self):
for i in range(30):
try:
self.conn = BareOCCConnection("localhost:9461", None, 2)
return
except dbexceptions.OperationalError:
if i == 29:
raise
time.sleep(1)
@classmethod
def setUpVTOCC(klass, dbconfig, authcredentials):
with open(dbconfig) as f:
klass.cfg = json.load(f)
with open(authcredentials) as f:
klass.credentials = json.load(f)
klass.vtstderr = open("/tmp/vtocc_stderr.log", "a+")
klass.process = subprocess.Popen([klass.vtroot +"/bin/vtocc",
"-port", "9461",
"-auth-credentials", authcredentials,
"-dbconfig", dbconfig,
"-logfile", LOGFILE,
"-querylog", QUERYLOGFILE],
stderr=klass.vtstderr)
time.sleep(1)
@classmethod
def tearDownVTOCC(klass):
try:
klass.process.kill()
klass.process.wait()
except AttributeError:
pass
def call(self, *args, **kwargs):
return self.conn.client.call(*args, **kwargs)
def authenticate(self, user, password):
challenge = self.call('AuthenticatorCRAMMD5.GetNewChallenge', "").reply['Challenge']
proof = user + " " + hmac.HMAC(str(password), challenge).hexdigest()
return self.call('AuthenticatorCRAMMD5.Authenticate', {"Proof": proof})
def test_correct_credentials(self):
self.authenticate("ala", "ma kota")
def test_secondary_credentials(self):
self.authenticate("ala", u"miala kota")
def test_incorrect_user(self):
self.assertRaises(gorpc.AppError, self.authenticate, "romek", "ma raka")
def test_incorrect_credentials(self):
self.assertRaises(gorpc.AppError, self.authenticate, "ala", "nie ma kota")
def test_challenge_is_used(self):
challenge = ""
proof = "ala " + hmac.HMAC("ma kota", challenge).hexdigest()
self.assertRaises(gorpc.AppError, self.call, 'AuthenticatorCRAMMD5.Authenticate', {"Proof": proof})
def test_only_few_requests_are_allowed(self):
for i in range(4):
try:
self.call('AuthenticatorCRAMMD5.GetNewChallenge', "")
except gorpc.GoRpcError as e:
break
else:
self.fail("Too many requests were allowed (%s)." % (i + 1))
def test_authenticated_methods_are_available(self):
self.authenticate("ala", "ma kota")
self.call('OccManager.GetSessionId', '<test db name>')
if __name__=="__main__":
parser = optparse.OptionParser(usage="usage: %prog [options]")
parser.add_option("-c", "--dbconfig", action="store", dest="dbconfig", default="dbtest.json",
help="json db config file")
parser.add_option("-a", "--authentication-credentials", action="store", dest="auth_credentials", default="authcredentials_test.json",
help="json CRAM-MD5 credentials file")
(options, args) = parser.parse_args()
try:
TestAuthentication.setUpVTOCC(options.dbconfig, options.auth_credentials)
unittest.main(argv=["auth_test.py"])
finally:
print "Waiting for vtocc to terminate...",
TestAuthentication.tearDownVTOCC()
print "OK"

216
test/connection_test.py Executable file
Просмотреть файл

@ -0,0 +1,216 @@
#!/usr/bin/env python
# coding: utf-8
import hmac
import json
import optparse
import os
import subprocess
import sys
import shutil
import time
import unittest
import MySQLdb
from net import gorpc
from vtdb import vt_occ2 as db
from vtdb import tablet2
from vtdb import dbexceptions
LOGFILE = "/tmp/vtocc.log"
QUERYLOGFILE = "/tmp/vtocc_queries.log"
# This is a VtOCCConnection that doesn't attempt to do authentication.
class BareOCCConnection(db.VtOCCConnection):
def dial(self):
tablet2.TabletConnection.dial(self)
@property
def uri(self):
return 'http://%s/_bson_rpc_/auth' % self.addr
class BaseTest(unittest.TestCase):
vtroot = os.getenv("VTROOT")
tabletuid = "9460"
mysql_port = 9460
vtocc_port = 9461
mysqldir = "/vt/vt_0000009460"
mysql_socket = os.path.join(mysqldir, "mysql.sock")
credentials = {"ala": ["ma kota", "miala kota"]}
credentials_file = os.path.join(mysqldir, 'authcredentials.json')
dbconfig_file = os.path.join(mysqldir, "dbconf.json")
dbconfig = {
'charset': 'utf8',
'dbname': 'vt_test',
'host': 'localhost',
'unix_socket': mysql_socket,
'uname': 'vt_dba', # use vt_dba as some tests depend on 'drop'
}
@property
def vtocc_uri(self):
return "localhost:%s" % self.vtocc_port
@classmethod
def dump_config_files(klass):
with open(klass.credentials_file, 'w') as f:
json.dump(klass.credentials, f)
with open(klass.dbconfig_file, 'w') as f:
json.dump(klass.dbconfig, f)
@classmethod
def setUpClass(klass):
os.mkdir(klass.mysqldir)
klass.dump_config_files()
klass.start_mysql()
klass.start_vtocc()
@classmethod
def start_vtocc(klass):
klass.user = str(klass.credentials.keys()[0])
klass.password = str(klass.credentials[klass.user][0])
klass.secondary_password = str(klass.credentials[klass.user][1])
klass.vtstderr = open("/tmp/vtocc_stderr.log", "a+")
# TODO(szopa): authcredentials
klass.process = subprocess.Popen([klass.vtroot +"/bin/vtocc",
"-port", str(klass.vtocc_port),
"-auth-credentials", klass.credentials_file,
"-dbconfig", klass.dbconfig_file,
"-logfile", LOGFILE,
"-querylog", QUERYLOGFILE],
stderr=klass.vtstderr)
time.sleep(1)
connection = db.VtOCCConnection("localhost:%s" % klass.vtocc_port, klass.dbconfig['dbname'], timeout=10, user=klass.user, password=klass.password)
connection.dial()
cursor = connection.cursor()
cursor.execute("create table if not exists connection_test (c int)")
connection.begin()
cursor.execute("delete from connection_test")
cursor.execute("insert into connection_test values (1), (2), (3), (4)")
connection.commit()
@classmethod
def tearDownClass(klass):
try:
klass.process.kill()
klass.process.wait()
except AttributeError:
pass
# stop mysql, delete directory
subprocess.call([
klass.vtroot+"/bin/mysqlctl",
"-tablet-uid", klass.tabletuid,
"-force", "teardown"
])
try:
shutil.rmtree(klass.mysqldir)
except OSError:
pass
@classmethod
def start_mysql(klass):
res = subprocess.call([
os.path.join(klass.vtroot+"/bin/mysqlctl"),
"-tablet-uid", klass.tabletuid,
"-port", str(klass.vtocc_port),
"-mysql-port", str(klass.mysql_port),
"init"
])
if res != 0:
raise Exception("cannot start mysql")
klass.mysql_socket = os.path.join(klass.mysqldir, "mysql.sock")
res = subprocess.call([
"mysql",
"-S", klass.mysql_socket,
"-u", "vt_dba",
"-e", "create database vt_test ; set global read_only = off"])
if res != 0:
raise Exception("Cannot create vt_test database")
class TestAuthentication(BaseTest):
def setUp(self):
for i in range(30):
try:
self.conn = BareOCCConnection(self.vtocc_uri, None, 2)
self.conn.dial()
return
except dbexceptions.OperationalError:
if i == 29:
raise
time.sleep(1)
def call(self, *args, **kwargs):
return self.conn.client.call(*args, **kwargs)
def authenticate(self, user, password):
challenge = self.call('AuthenticatorCRAMMD5.GetNewChallenge', "").reply['Challenge']
proof = user + " " + hmac.HMAC(str(password), challenge).hexdigest()
return self.call('AuthenticatorCRAMMD5.Authenticate', {"Proof": proof})
def test_correct_credentials(self):
self.authenticate(self.user, self.password)
def test_secondary_credentials(self):
self.authenticate(self.user, self.secondary_password)
def test_incorrect_user(self):
self.assertRaises(gorpc.AppError, self.authenticate, "romek", "ma raka")
def test_incorrect_credentials(self):
self.assertRaises(gorpc.AppError, self.authenticate, "ala", "nie ma kota")
def test_challenge_is_used(self):
challenge = ""
proof = "%s %s" %(self.user, hmac.HMAC(self.password, challenge).hexdigest())
self.assertRaises(gorpc.AppError, self.call, 'AuthenticatorCRAMMD5.Authenticate', {"Proof": proof})
def test_only_few_requests_are_allowed(self):
for i in range(4):
try:
self.call('AuthenticatorCRAMMD5.GetNewChallenge', "")
except gorpc.GoRpcError as e:
break
else:
self.fail("Too many requests were allowed (%s)." % (i + 1))
def test_authenticated_methods_are_available(self):
self.authenticate(self.user, self.password)
self.call('OccManager.GetSessionId', self.dbconfig['dbname'])
class TestConnection(BaseTest):
def setUp(self):
self.connection = db.VtOCCConnection(self.vtocc_uri, self.dbconfig['dbname'], timeout=1, user=self.user, password=self.password)
self.connection.dial()
def test_reconnect(self):
cursor = self.connection.cursor()
cursor.execute("create table if not exists connection_test (c int)")
cursor.execute("select 1 from connection_test")
try:
cursor.execute("select sleep(1) from connection_test")
except MySQLdb.DatabaseError as e:
if "deadline exceeded" not in e.args[1]:
raise
else:
self.fail("Expected timeout error not raised")
cursor.execute("select 2 from connection_test")
if __name__=="__main__":
try:
BaseTest.setUpClass()
unittest.main(argv=["auth_test.py"])
finally:
print "Waiting for processes to terminate...",
BaseTest.tearDownClass()
print "OK"