treeherder/tests/conftest.py

543 строки
14 KiB
Python
Исходник Обычный вид История

import json
import os
2013-03-19 23:03:26 +04:00
import sys
from os.path import dirname
2014-07-03 17:56:45 +04:00
import kombu
2013-03-19 23:03:26 +04:00
import pytest
import responses
2014-07-03 17:56:45 +04:00
from django.core.management import call_command
from requests import Request
from requests_hawk import HawkAuth
from webtest.app import TestApp
2014-07-03 17:56:45 +04:00
from tests.sampledata import SampleData
from treeherder.client import TreeherderClient
from treeherder.config.wsgi import application
2014-07-03 17:56:45 +04:00
from treeherder.etl.oauth_utils import OAuthCredentials
def pytest_addoption(parser):
parser.addoption(
"--runslow",
action="store_true",
help="run slow tests",
)
def pytest_sessionstart(session):
"""
2013-04-16 22:00:34 +04:00
Set up the test environment.
2013-04-16 22:00:34 +04:00
Set DJANGO_SETTINGS_MODULE and sets up a test database.
2013-04-16 22:00:34 +04:00
"""
2013-03-19 23:03:26 +04:00
sys.path.append(dirname(dirname(__file__)))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "treeherder.config.settings")
from django.conf import settings
2015-01-30 01:20:37 +03:00
from django.test.runner import DiscoverRunner
2013-03-19 23:03:26 +04:00
# we don't actually let Django run the tests, but we need to use some
# methods of its runner for setup/teardown of dbs and some other things
2015-01-30 01:20:37 +03:00
session.django_runner = DiscoverRunner()
# this provides templates-rendered debugging info and locmem mail storage
session.django_runner.setup_test_environment()
settings.DATABASES["default"]["TEST_NAME"] = "test_treeherder"
2013-05-13 21:19:19 +04:00
# this makes celery calls synchronous, useful for unit testing
settings.CELERY_ALWAYS_EAGER = True
2013-05-15 16:54:42 +04:00
settings.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
2013-05-13 21:19:19 +04:00
# Don't attempt to submit bug associations to Elasticsearch.
settings.MIRROR_CLASSIFICATIONS = False
2014-06-03 20:46:20 +04:00
# Reconfigure pulse to operate on default vhost of rabbitmq
settings.PULSE_URI = settings.BROKER_URL
settings.PULSE_EXCHANGE_NAMESPACE = 'test'
def pytest_sessionfinish(session):
"""Tear down the test environment, including databases."""
session.django_runner.teardown_test_environment()
def pytest_runtest_setup(item):
"""
2013-04-16 22:00:34 +04:00
Per-test setup.
- Add an option to run those tests marked as 'slow'
- Provide cache isolation incrementing the cache key prefix
- Drop and recreate tables in the master db
2013-04-16 22:00:34 +04:00
"""
if 'slow' in item.keywords and not item.config.getoption("--runslow"):
pytest.skip("need --runslow option to run")
increment_cache_key_prefix()
# this should provide isolation between tests.
call_command("init_master_db", interactive=False, skip_fixtures=True)
def pytest_runtest_teardown(item):
"""
2013-04-16 22:00:34 +04:00
Per-test teardown.
2013-04-18 19:49:19 +04:00
Roll back the Django ORM transaction and delete all the dbs created
between tests
2013-04-16 22:00:34 +04:00
"""
from treeherder.model.models import Datasource
2013-03-19 23:03:26 +04:00
ds_list = Datasource.objects.all()
for ds in ds_list:
ds.delete()
call_command("migrate", 'model', '0001_initial', interactive=False)
def increment_cache_key_prefix():
"""Increment a cache prefix to effectively clear the cache."""
from django.core.cache import cache
cache.key_prefix = ""
prefix_counter_cache_key = "treeherder-tests-key-prefix-counter"
try:
key_prefix_counter = cache.incr(prefix_counter_cache_key)
except ValueError:
key_prefix_counter = 0
cache.set(prefix_counter_cache_key, key_prefix_counter)
cache.key_prefix = "t{0}".format(key_prefix_counter)
2013-04-19 03:23:58 +04:00
@pytest.fixture()
def initial_data():
call_command('load_initial_data')
2014-06-02 23:15:13 +04:00
@pytest.fixture(scope='function')
def jm(request):
2013-04-18 19:49:19 +04:00
""" Give a test access to a JobsModel instance. """
from django.conf import settings
2013-04-16 22:00:34 +04:00
from treeherder.model.derived.jobs import JobsModel
model = JobsModel.create(settings.DATABASES["default"]["TEST_NAME"])
2013-04-19 21:58:46 +04:00
# patch in additional test-only procs on the datasources
add_test_procs_file(
model.get_dhub(),
model.get_datasource().key,
"jobs_test.json",
)
2013-09-04 18:38:59 +04:00
def fin():
model.disconnect()
request.addfinalizer(fin)
return model
2013-04-17 20:38:40 +04:00
def add_test_procs_file(dhub, key, filename):
"""Add an extra procs file in for testing purposes."""
test_proc_file = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
filename,
)
del dhub.procs[key]
proclist = dhub.data_sources[key]["procs"]
if test_proc_file not in proclist:
proclist.append(test_proc_file)
dhub.data_sources[key]["procs"] = proclist
dhub.load_procs(key)
@pytest.fixture()
def jobs_ds():
from django.conf import settings
from treeherder.model.models import Datasource
return Datasource.objects.create(project=settings.DATABASES["default"]["TEST_NAME"])
2013-04-20 05:06:00 +04:00
2013-04-19 03:47:10 +04:00
@pytest.fixture(scope='session')
2013-04-19 03:23:58 +04:00
def sample_data():
"""Returns a SampleData() object"""
from sampledata import SampleData
return SampleData()
2013-09-04 18:38:59 +04:00
@pytest.fixture(scope='session')
def test_base_dir():
return os.path.dirname(__file__)
@pytest.fixture
def sample_resultset(sample_data):
return sample_data.resultset_data
@pytest.fixture
def test_project():
from django.conf import settings
return settings.DATABASES["default"]["TEST_NAME"]
@pytest.fixture
def test_repository():
from django.conf import settings
from treeherder.model.models import Repository, RepositoryGroup
RepositoryGroup.objects.create(
name="development",
active_status="active",
description=""
)
return Repository.objects.create(
dvcs_type="hg",
name=settings.DATABASES["default"]["TEST_NAME"],
url="https://hg.mozilla.org/mozilla-central",
active_status="active",
codebase="gecko",
repository_group_id=1,
description=""
)
@pytest.fixture
def mock_log_parser(monkeypatch):
from celery import task
from treeherder.log_parser import tasks
@task
def task_mock(*args, **kwargs):
pass
monkeypatch.setattr(tasks,
'parse_log',
task_mock)
@pytest.fixture
def result_set_stored(jm, initial_data, sample_resultset):
2013-11-05 21:09:26 +04:00
jm.store_result_set_data(sample_resultset)
return sample_resultset
@pytest.fixture(scope='function')
def mock_get_resultset(monkeypatch, result_set_stored):
from treeherder.etl import common
2013-11-08 01:00:15 +04:00
def _get_resultset(params):
for k in params:
rev = params[k][0]
params[k] = {
rev: {
'id': 1,
'revision_hash': result_set_stored[0]['revision_hash']
}
}
return params
monkeypatch.setattr(common, 'lookup_revisions', _get_resultset)
2013-11-05 21:09:26 +04:00
2014-06-02 23:15:13 +04:00
@pytest.fixture(scope='function')
def refdata(request):
2013-11-05 21:09:26 +04:00
"""returns a patched RefDataManager for testing purpose"""
from treeherder.model.derived import RefDataManager
from tests.conftest import add_test_procs_file
refdata = RefDataManager()
proc_path = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
'refdata_test.json'
2013-11-05 21:09:26 +04:00
)
add_test_procs_file(refdata.dhub, 'reference', proc_path)
2014-06-02 07:00:52 +04:00
def fin():
refdata.disconnect()
request.addfinalizer(fin)
2013-11-05 21:09:26 +04:00
return refdata
2014-03-13 18:17:34 +04:00
2014-03-13 18:17:34 +04:00
@pytest.fixture
def mock_message_broker(monkeypatch):
from django.conf import settings
2014-06-02 07:00:52 +04:00
monkeypatch.setattr(settings, 'BROKER_URL', 'memory://')
2014-06-03 20:46:20 +04:00
@pytest.fixture
def resultset_with_three_jobs(jm, sample_data, sample_resultset):
"""
Stores a number of jobs in the same resultset.
"""
num_jobs = 3
resultset = sample_resultset[0]
jobs = sample_data.job_data[0:num_jobs]
# Only store data for the first resultset....
resultset_creation = jm.store_result_set_data([resultset])
blobs = []
for index, blob in enumerate(jobs):
# Modify job structure to sync with the resultset sample data
if 'sources' in blob:
del blob['sources']
# Skip log references since they do not work correctly in pending state.
if 'log_references' in blob['job']:
del blob['job']['log_references']
blob['revision_hash'] = resultset['revision_hash']
blob['job']['state'] = 'pending'
blobs.append(blob)
# Store and process the jobs so they are present in the tables.
jm.store_job_data(blobs)
return resultset_creation['inserted_result_set_ids'][0]
2014-06-03 20:46:20 +04:00
@pytest.fixture
def eleven_jobs_stored(jm, sample_data, sample_resultset, mock_log_parser):
2014-06-03 20:46:20 +04:00
"""stores a list of 11 job samples"""
jm.store_result_set_data(sample_resultset)
num_jobs = 11
jobs = sample_data.job_data[0:num_jobs]
max_index = len(sample_resultset) - 1
resultset_index = 0
blobs = []
for index, blob in enumerate(jobs):
if resultset_index > max_index:
resultset_index = 0
# Modify job structure to sync with the resultset sample data
if 'sources' in blob:
del blob['sources']
blob['revision_hash'] = sample_resultset[resultset_index]['revision_hash']
blobs.append(blob)
resultset_index += 1
jm.store_job_data(blobs)
2014-07-03 17:56:45 +04:00
@pytest.fixture
def set_oauth_credentials():
OAuthCredentials.set_credentials(SampleData.get_credentials())
2014-07-03 17:56:45 +04:00
@pytest.fixture
def mock_post_json(monkeypatch, client_credentials):
def _post_json(th_client, project, endpoint, data,
timeout=None, auth=None):
2014-07-03 17:56:45 +04:00
auth = auth or th_client.auth
if not auth:
auth = HawkAuth(credentials={
'id': client_credentials.client_id,
'key': str(client_credentials.secret),
'algorithm': 'sha256'
})
app = TestApp(application)
uri = th_client._get_project_uri(project, endpoint)
req = Request('POST', uri, json=data, auth=auth)
prepped_request = req.prepare()
getattr(app, 'post')(
prepped_request.url,
params=json.dumps(data),
content_type='application/json',
extra_environ={
'HTTP_AUTHORIZATION': str(prepped_request.headers['Authorization'])
}
2014-07-03 17:56:45 +04:00
)
monkeypatch.setattr(TreeherderClient, '_post_json', _post_json)
2014-07-16 20:39:31 +04:00
2014-07-16 20:56:13 +04:00
@pytest.fixture
def mock_get_remote_content(monkeypatch):
def _get_remote_content(url, params=None):
response = TestApp(application).get(url, params=params, status=200)
return response.json
2014-07-16 20:56:13 +04:00
import treeherder.etl.common
monkeypatch.setattr(treeherder.etl.common,
'get_remote_content', _get_remote_content)
@pytest.fixture
def activate_responses(request):
responses.start()
def fin():
responses.reset()
responses.stop()
request.addfinalizer(fin)
def pulse_consumer(exchange, request):
from django.conf import settings
exchange_name = 'exchange/{}/v1/{}'.format(
settings.PULSE_EXCHANGE_NAMESPACE,
exchange
)
connection = kombu.Connection(settings.PULSE_URI)
exchange = kombu.Exchange(
name=exchange_name,
type='topic'
)
queue = kombu.Queue(
no_ack=True,
exchange=exchange, # Exchange name
routing_key='#', # Bind to all messages
auto_delete=True, # Delete after each test
exclusive=False) # Disallow multiple consumers
simpleQueue = connection.SimpleQueue(
name=queue,
channel=connection,
no_ack=True)
def fin():
connection.release()
request.addfinalizer(fin)
return simpleQueue
@pytest.fixture
def pulse_resultset_consumer(request):
return pulse_consumer('new-result-set', request)
@pytest.fixture
def pulse_action_consumer(request):
return pulse_consumer('job-actions', request)
@pytest.fixture
def mock_error_summary(monkeypatch):
bs_obj = ["foo", "bar"]
from treeherder.model import error_summary
def _get_error_summary(params):
return bs_obj
monkeypatch.setattr(error_summary, "get_error_summary", _get_error_summary)
return bs_obj
@pytest.fixture
def failure_lines(jm, test_repository, eleven_jobs_stored, initial_data):
from tests.autoclassify.utils import test_line, create_failure_lines
test_repository.save()
job = jm.get_job(1)[0]
return create_failure_lines(test_repository,
job["job_guid"],
[(test_line, {}),
(test_line, {"subtest": "subtest2"})])
@pytest.fixture
def classified_failures(request, jm, eleven_jobs_stored, initial_data, failure_lines):
from treeherder.model.models import ClassifiedFailure, FailureMatch, MatcherManager
from treeherder.autoclassify import detectors
job_1 = jm.get_job(1)[0]
class TreeherderUnitTestDetector(detectors.Detector):
def __call__(self, failure_lines):
pass
test_matcher = MatcherManager._detector_funcs = {}
test_matcher = MatcherManager._matcher_funcs = {}
test_matcher = MatcherManager.register_detector(TreeherderUnitTestDetector)
def finalize():
MatcherManager._detector_funcs = {}
MatcherManager._matcher_funcs = {}
request.addfinalizer(finalize)
classified_failures = []
for failure_line in failure_lines:
if failure_line.job_guid == job_1["job_guid"]:
classified_failure = ClassifiedFailure()
classified_failure.save()
match = FailureMatch(failure_line=failure_line,
classified_failure=classified_failure,
matcher=test_matcher.db_object,
score=1.0,
is_best=True)
match.save()
classified_failures.append(classified_failure)
return classified_failures
@pytest.fixture
def retriggers(jm, eleven_jobs_stored):
original = jm.get_job(2)[0]
retrigger = original.copy()
retrigger['job_guid'] = "f1c75261017c7c5ce3000931dce4c442fe0a1298"
jm.execute(proc="jobs_test.inserts.duplicate_job",
placeholders=[retrigger['job_guid'], original['job_guid']])
return [retrigger]
@pytest.fixture
def api_user(request):
from django.contrib.auth.models import User
user = User.objects.create_user('MyUser')
def fin():
user.delete()
request.addfinalizer(fin)
return user
@pytest.fixture
def client_credentials(request, api_user):
from django.conf import settings
from treeherder.credentials.models import Credentials
# We need to get_or_create here because of bug 1133273.
# It can be a straight create once that bug is solved.
client_credentials, _ = Credentials.objects.get_or_create(
client_id=settings.ETL_CLIENT_ID,
defaults={'owner': api_user, 'authorized': True}
)
def fin():
client_credentials.delete()
request.addfinalizer(fin)
return client_credentials