treeherder/tests/conftest.py

714 строки
21 KiB
Python
Исходник Обычный вид История

import copy
import datetime
import json
import os
2014-07-03 17:56:45 +04:00
import kombu
2013-03-19 23:03:26 +04:00
import pytest
import responses
from _pytest.monkeypatch import MonkeyPatch
from django.conf import settings
from rest_framework.test import APIClient
2014-07-03 17:56:45 +04:00
from treeherder.autoclassify.autoclassify import mark_best_classification
2017-01-06 01:55:33 +03:00
from treeherder.etl.jobs import store_job_data
from treeherder.etl.push import store_push_data
2018-01-05 16:50:12 +03:00
from treeherder.model.models import (Commit,
JobNote,
Push,
TextLogErrorMetadata,
User)
from treeherder.perf.models import (IssueTracker,
PerformanceAlert,
PerformanceAlertSummary,
PerformanceDatum,
PerformanceFramework,
PerformanceSignature)
from treeherder.services.pulse.exchange import get_exchange
2014-07-03 17:56:45 +04:00
def pytest_addoption(parser):
parser.addoption(
"--runslow",
action="store_true",
help="run slow tests",
)
def pytest_runtest_setup(item):
"""
2013-04-16 22:00:34 +04:00
Per-test setup.
- Add an option to run those tests marked as 'slow'
- Clear the django cache between runs
2013-04-16 22:00:34 +04:00
"""
if 'slow' in item.keywords and not item.config.getoption("--runslow"):
pytest.skip("need --runslow option to run")
from django.core.cache import cache
cache.clear()
2013-04-19 03:23:58 +04:00
@pytest.fixture(scope="session", autouse=True)
def block_unmocked_requests():
"""
Prevents requests from being made unless they are mocked.
Helps avoid inadvertent dependencies on external resources during the test run.
"""
def mocked_send(*args, **kwargs):
raise RuntimeError('Tests must mock all HTTP requests!')
# The standard monkeypatch fixture cannot be used with session scope:
# https://github.com/pytest-dev/pytest/issues/363
monkeypatch = MonkeyPatch()
# Monkeypatching here since any higher level would break responses:
# https://github.com/getsentry/responses/blob/0.5.1/responses.py#L295
monkeypatch.setattr('requests.adapters.HTTPAdapter.send', mocked_send)
yield monkeypatch
monkeypatch.undo()
@pytest.fixture
def elasticsearch(request):
from treeherder.services.elasticsearch import reinit_index, refresh_index
if not settings.ELASTICSEARCH_URL:
return
reinit_index()
refresh_index()
@pytest.fixture
2013-04-19 03:23:58 +04:00
def sample_data():
"""Returns a SampleData() object"""
from .sampledata import SampleData
2013-04-19 03:23:58 +04:00
return SampleData()
2013-09-04 18:38:59 +04:00
@pytest.fixture(scope='session')
def test_base_dir():
return os.path.dirname(__file__)
@pytest.fixture
def sample_push(sample_data):
return copy.deepcopy(sample_data.push_data)
@pytest.fixture(name='create_push')
def fixture_create_push():
"""Return a function to create a push"""
def create(repository,
revision='4c45a777949168d16c03a4cba167678b7ab65f76',
author='foo@bar.com'):
return Push.objects.create(
repository=repository,
revision=revision,
author=author,
time=datetime.datetime.now())
return create
@pytest.fixture(name='create_commit')
def fixture_create_commit():
"""Return a function to create a commit"""
def create(push, comments='Bug 12345 - This is a message'):
return Commit.objects.create(
push=push,
revision=push.revision,
author=push.author,
comments=comments)
return create
@pytest.fixture
def test_repository(transactional_db):
from treeherder.model.models import Repository, RepositoryGroup
RepositoryGroup.objects.create(
name="development",
description=""
)
r = Repository.objects.create(
dvcs_type="hg",
name=settings.TREEHERDER_TEST_REPOSITORY_NAME,
url="https://hg.mozilla.org/mozilla-central",
active_status="active",
codebase="gecko",
repository_group_id=1,
description="",
performance_alerts_enabled=True,
expire_performance_data=False
)
return r
@pytest.fixture
def test_issue_tracker(transactional_db):
return IssueTracker.objects.create(
name="Bugzilla",
task_base_url="https://bugzilla.mozilla.org/show_bug.cgi?id="
)
@pytest.fixture
def test_repository_2(test_repository):
from treeherder.model.models import Repository
return Repository.objects.create(
repository_group=test_repository.repository_group,
name=test_repository.name + '_2',
dvcs_type=test_repository.dvcs_type,
url=test_repository.url + '_2',
codebase=test_repository.codebase)
2018-01-05 16:50:12 +03:00
@pytest.fixture
def test_push(create_push, test_repository):
return create_push(test_repository)
2018-01-05 16:50:12 +03:00
@pytest.fixture
def test_commit(create_commit, test_push):
return create_commit(test_push)
2018-01-05 16:50:12 +03:00
@pytest.fixture(name='create_jobs')
def fixture_create_jobs(test_repository, failure_classifications):
"""Return a function to create jobs"""
from treeherder.model.models import Job
def create(jobs):
store_job_data(test_repository, jobs)
return [Job.objects.get(id=i) for i in range(1, len(jobs) + 1)]
return create
@pytest.fixture
def test_job(eleven_job_blobs, create_jobs):
return create_jobs(eleven_job_blobs[0:1])[0]
@pytest.fixture
def test_job_2(eleven_job_blobs, create_jobs):
return create_jobs(eleven_job_blobs[0:2])[1]
@pytest.fixture
def mock_log_parser(monkeypatch):
from celery import task
from treeherder.log_parser import tasks
@task
def task_mock(*args, **kwargs):
pass
monkeypatch.setattr(tasks, 'parse_logs', task_mock)
@pytest.fixture
def push_stored(test_repository, sample_push):
store_push_data(test_repository, sample_push)
2013-11-05 21:09:26 +04:00
return sample_push
2014-06-03 20:46:20 +04:00
@pytest.fixture
def eleven_job_blobs(sample_data, sample_push, test_repository, mock_log_parser):
store_push_data(test_repository, sample_push)
2014-06-03 20:46:20 +04:00
num_jobs = 11
jobs = sample_data.job_data[0:num_jobs]
max_index = len(sample_push) - 1
push_index = 0
2014-06-03 20:46:20 +04:00
blobs = []
for blob in jobs:
2014-06-03 20:46:20 +04:00
if push_index > max_index:
push_index = 0
2014-06-03 20:46:20 +04:00
# Modify job structure to sync with the push sample data
2014-06-03 20:46:20 +04:00
if 'sources' in blob:
del blob['sources']
blob['revision'] = sample_push[push_index]['revision']
2014-06-03 20:46:20 +04:00
blobs.append(blob)
push_index += 1
return blobs
2014-06-03 20:46:20 +04:00
@pytest.fixture
2017-01-06 01:55:33 +03:00
def eleven_jobs_stored(test_repository, failure_classifications, eleven_job_blobs):
"""stores a list of 11 job samples"""
2017-01-06 01:55:33 +03:00
store_job_data(test_repository, eleven_job_blobs)
2014-06-03 20:46:20 +04:00
@pytest.fixture
def taskcluster_jobs_stored(test_repository, sample_data):
"""stores a list of TaskCluster job samples"""
store_job_data(test_repository, sample_data.transformed_pulse_jobs)
@pytest.fixture
def test_job_with_notes(test_job, test_user):
"""test job with job notes."""
2016-10-03 19:56:51 +03:00
for failure_classification_id in [2, 3]:
JobNote.objects.create(job=test_job,
2016-10-03 19:56:51 +03:00
failure_classification_id=failure_classification_id,
user=test_user,
text="you look like a man-o-lantern")
test_job.refresh_from_db()
return test_job
@pytest.fixture
def activate_responses(request):
responses.start()
def fin():
responses.reset()
responses.stop()
request.addfinalizer(fin)
@pytest.fixture
def pulse_connection():
"""
Build a Pulse connection with the Kombu library
This is a non-lazy mirror of our Pulse service's build_connection as
explained in: https://bugzilla.mozilla.org/show_bug.cgi?id=1484196
"""
Bug 1337717 - Update to newer Celery, Kombu, py-amqp and billiard (#4722) Updating in one go, since the Celery 4 release only supports the newer versions of its dependencies and vice versa. Of note, this fixes the unhelpful connection error messages shown when in fact there was an authentication problem, and brings Celery/Kombu support for Python 3.7. It's also likely that this will fix the pulse listener hang seen in bug 1529404. The new Celery release has renamed a number of the settings. Most changes were performed by running: ``` celery upgrade settings treeherder/config/settings.py --django celery upgrade settings tests/settings.py --django ``` The Django integration in celery.py has been cleaned up by following: https://celery.readthedocs.io/en/latest/django/first-steps-with-django.html The bug being hit that caused this to be reverted back in #2119/bug 1333079 was due to Celery 4 no longer supporting calling `apply_async()` with just the `routing_key` - it now has to be called with either just the `queue`, or else both the `routing_key` and `queue`, otherwise the task ends up in the `default` queue. Sadly this isn't mentioned in the Celery breaking changes list - I'll file an upstream issue shortly. Changes: http://docs.celeryproject.org/en/master/history/whatsnew-4.0.html http://docs.celeryproject.org/en/master/changelog.html#rc1 https://github.com/celery/celery/compare/v3.1.26...v4.3.0rc1 http://docs.celeryproject.org/projects/kombu/en/stable/changelog.html#version-4-3-0 https://github.com/celery/kombu/compare/v3.0.37...v4.3.0 https://amqp.readthedocs.io/en/stable/changelog.html https://github.com/celery/py-amqp/compare/v1.4.9...v2.4.1 https://github.com/celery/billiard/blob/v3.6.0/CHANGES.txt https://github.com/celery/billiard/compare/v3.3.0.23...v3.6.0
2019-02-28 22:52:22 +03:00
return kombu.Connection(settings.CELERY_BROKER_URL)
@pytest.fixture
def pulse_exchange(pulse_connection, request):
def build_exchange(name, create_exchange):
return get_exchange(pulse_connection, name, create=create_exchange)
return build_exchange
@pytest.fixture
def failure_lines(test_job, elasticsearch):
from tests.autoclassify.utils import test_line, create_failure_lines
return create_failure_lines(test_job,
[(test_line, {}),
(test_line, {"subtest": "subtest2"})])
@pytest.fixture
def failure_classifications(transactional_db):
from treeherder.model.models import FailureClassification
for name in ["not classified", "fixed by commit", "expected fail",
"intermittent", "infra", "intermittent needs filing",
"autoclassified intermittent"]:
FailureClassification(name=name).save()
@pytest.fixture
def text_log_errors_failure_lines(test_job, failure_lines):
from tests.autoclassify.utils import test_line, create_text_log_errors
lines = [(test_line, {}),
(test_line, {"subtest": "subtest2"})]
text_log_errors = create_text_log_errors(test_job, lines)
for error_line, failure_line in zip(text_log_errors, failure_lines):
TextLogErrorMetadata.objects.create(text_log_error=error_line,
failure_line=failure_line)
return text_log_errors, failure_lines
@pytest.fixture
def test_matcher(request):
return "TreeherderUnitTestDetector"
@pytest.fixture
def classified_failures(test_job, text_log_errors_failure_lines, test_matcher,
failure_classifications):
from treeherder.model.models import ClassifiedFailure
from treeherder.services.elasticsearch import refresh_index
_, failure_lines = text_log_errors_failure_lines
classified_failures = []
for failure_line in failure_lines:
if failure_line.job_guid == test_job.guid:
classified_failure = ClassifiedFailure.objects.create()
failure_line.error.create_match(test_matcher, classified_failure)
mark_best_classification(failure_line.error, classified_failure)
classified_failures.append(classified_failure)
if settings.ELASTICSEARCH_URL:
refresh_index()
return classified_failures
@pytest.fixture
def test_user(db):
# a user *without* sheriff/staff permissions
user = User.objects.create(username="testuser1",
email='user@foo.com',
is_staff=False)
return user
@pytest.fixture
def test_ldap_user(db):
"""
A user whose username matches those generated for LDAP SSO logins,
and who does not have `is_staff` permissions.
"""
user = User.objects.create(username="mozilla-ldap/user@foo.com",
email='user@foo.com',
is_staff=False)
return user
@pytest.fixture
def test_sheriff(db):
# a user *with* sheriff/staff permissions
user = User.objects.create(username="testsheriff1",
email='sheriff@foo.com',
is_staff=True)
return user
@pytest.fixture
def test_perf_framework(transactional_db):
return PerformanceFramework.objects.create(
name='test_talos', enabled=True)
@pytest.fixture
def test_perf_signature(test_repository, test_perf_framework):
from treeherder.model.models import (MachinePlatform,
Option,
OptionCollection)
option = Option.objects.create(name='opt')
option_collection = OptionCollection.objects.create(
option_collection_hash='my_option_hash',
option=option)
platform = MachinePlatform.objects.create(
os_name='win',
platform='win7',
architecture='x86')
signature = PerformanceSignature.objects.create(
repository=test_repository,
signature_hash=(40*'t'),
framework=test_perf_framework,
platform=platform,
option_collection=option_collection,
suite='mysuite',
test='mytest',
has_subtests=False,
extra_options='e10s opt',
last_updated=datetime.datetime.now()
)
return signature
@pytest.fixture
def test_perf_signature_2(test_perf_signature):
return PerformanceSignature.objects.create(
repository=test_perf_signature.repository,
signature_hash=(20*'t2'),
framework=test_perf_signature.framework,
platform=test_perf_signature.platform,
option_collection=test_perf_signature.option_collection,
suite='mysuite2',
test='mytest2',
has_subtests=test_perf_signature.has_subtests,
extra_options=test_perf_signature.extra_options,
last_updated=datetime.datetime.now()
)
@pytest.fixture
def test_perf_data(test_perf_signature, eleven_jobs_stored):
from treeherder.model.models import Job
def normalized_time(hour):
return datetime.datetime(2018, 7, 3, hour)
# for making things easier, ids for jobs
# and push should be the same;
# also, we only need a subset of jobs
perf_jobs = Job.objects.filter(pk__in=range(7, 11)).order_by('push__time').all()
for index, job in enumerate(perf_jobs, start=1):
job.push_id = index
job.save()
perf_datum = PerformanceDatum.objects.create(
value=10,
push_timestamp=normalized_time(index),
job=job,
push=job.push,
repository=job.repository,
signature=test_perf_signature
)
perf_datum.push.time = normalized_time(index)
perf_datum.push.save()
return PerformanceDatum.objects.order_by('id').all()
@pytest.fixture
def mock_autoclassify_jobs_true(monkeypatch):
monkeypatch.setattr(settings, 'AUTOCLASSIFY_JOBS', True)
@pytest.fixture
def mock_bugzilla_api_request(monkeypatch):
"""Mock fetch_json() used by Bugzilla ETL to return a local sample file."""
import treeherder.etl.bugzilla
def _fetch_json(url, params=None):
tests_folder = os.path.dirname(__file__)
bug_list_path = os.path.join(
tests_folder,
"sample_data",
"bug_list.json"
)
with open(bug_list_path) as f:
return json.load(f)
monkeypatch.setattr(treeherder.etl.bugzilla,
'fetch_json',
_fetch_json)
@pytest.fixture
def bugs(mock_bugzilla_api_request):
from treeherder.etl.bugzilla import BzApiBugProcess
from treeherder.model.models import Bugscache
process = BzApiBugProcess()
process.run()
return Bugscache.objects.all()
@pytest.fixture
def client():
"""
A django-rest-framework APIClient instance:
http://www.django-rest-framework.org/api-guide/testing/#apiclient
"""
return APIClient()
@pytest.fixture
def authorized_sheriff_client(client, test_sheriff):
client.force_authenticate(user=test_sheriff)
return client
@pytest.fixture
def text_log_error_lines(test_job, failure_lines):
from tests.autoclassify.utils import create_text_log_errors
from treeherder.model.models import FailureLine
lines = [(item, {}) for item in FailureLine.objects.filter(job_guid=test_job.guid).values()]
errors = create_text_log_errors(test_job, lines)
return errors
@pytest.fixture
def test_perf_alert_summary(test_repository, push_stored, test_perf_framework, test_issue_tracker):
return PerformanceAlertSummary.objects.create(
repository=test_repository,
framework=test_perf_framework,
prev_push_id=1,
push_id=2,
manually_created=False,
created=datetime.datetime.now())
@pytest.fixture
def test_perf_alert_summary_2(test_perf_alert_summary):
return PerformanceAlertSummary.objects.create(
repository=test_perf_alert_summary.repository,
framework=test_perf_alert_summary.framework,
prev_push_id=test_perf_alert_summary.prev_push_id+1,
push_id=test_perf_alert_summary.push_id+1,
manually_created=False,
created=datetime.datetime.now())
@pytest.fixture
def test_perf_alert(test_perf_signature, test_perf_alert_summary):
return PerformanceAlert.objects.create(
summary=test_perf_alert_summary,
series_signature=test_perf_signature,
is_regression=True,
amount_pct=0.5,
amount_abs=50.0,
prev_value=100.0,
new_value=150.0,
t_value=20.0)
@pytest.fixture
def test_conflicting_perf_alert(test_perf_signature, test_perf_alert_summary_2):
return PerformanceAlert.objects.create(
summary=test_perf_alert_summary_2,
series_signature=test_perf_signature,
is_regression=True,
amount_pct=0.5,
amount_abs=50.0,
prev_value=100.0,
new_value=150.0,
t_value=20.0)
@pytest.fixture
def test_perf_alert_2(test_perf_alert, test_perf_signature_2, test_perf_alert_summary_2):
return PerformanceAlert.objects.create(
summary=test_perf_alert_summary_2,
series_signature=test_perf_signature_2,
is_regression=True,
amount_pct=0.5,
amount_abs=50.0,
prev_value=100.0,
new_value=150.0,
t_value=20.0)
@pytest.fixture
def generic_reference_data(test_repository):
'''
Generic reference data (if you want to create a bunch of mock jobs)
'''
from treeherder.model.models import (BuildPlatform,
JobGroup,
JobType,
Machine,
MachinePlatform,
Option,
OptionCollection,
Product,
ReferenceDataSignatures)
class RefdataHolder:
pass
r = RefdataHolder()
r.option = Option.objects.create(name='my_option')
r.option_collection = OptionCollection.objects.create(
option_collection_hash='my_option_hash',
option=r.option)
r.option_collection_hash = r.option_collection.option_collection_hash
r.machine_platform = MachinePlatform.objects.create(
os_name="my_os",
platform="my_platform",
architecture="x86")
r.build_platform = BuildPlatform.objects.create(
os_name="my_os",
platform="my_platform",
architecture="x86")
r.machine = Machine.objects.create(name='mymachine')
r.job_group = JobGroup.objects.create(symbol='S', name='myjobgroup')
r.job_type = JobType.objects.create(symbol='j', name='myjob')
r.product = Product.objects.create(name='myproduct')
r.signature = ReferenceDataSignatures.objects.create(
name='myreferencedatasignaeture',
signature='1234',
build_os_name=r.build_platform.os_name,
build_platform=r.build_platform.platform,
build_architecture=r.build_platform.architecture,
machine_os_name=r.machine_platform.os_name,
machine_platform=r.machine_platform.platform,
machine_architecture=r.machine_platform.architecture,
job_group_name=r.job_group.name,
job_group_symbol=r.job_group.symbol,
job_type_name=r.job_type.name,
job_type_symbol=r.job_type.symbol,
option_collection_hash=r.option_collection_hash,
build_system_type='buildbot',
repository=test_repository.name,
first_submission_timestamp=0)
return r
@pytest.fixture
def bug_data(eleven_jobs_stored, test_repository, test_push, bugs):
from treeherder.model.models import (Job,
BugJobMap,
Option)
jobs = Job.objects.all()
bug_id = bugs[0].id
job_id = jobs[0].id
BugJobMap.create(job_id=job_id, bug_id=bug_id)
query_string = '?startday=2012-05-09&endday=2018-05-10&tree={}'.format(
test_repository.name)
return {
'tree': test_repository.name,
'option': Option.objects.first(),
'bug_id': bug_id,
'job': jobs[0],
'jobs': jobs,
'query_string': query_string
}
@pytest.fixture
def test_run_data(bug_data):
pushes = Push.objects.all()
time = pushes[0].time.strftime('%Y-%m-%d')
test_runs = 0
for push in list(pushes):
if push.time.strftime('%Y-%m-%d') == time:
test_runs += 1
return {
'test_runs': test_runs,
'push_time': time
}
@pytest.fixture
def generate_enough_perf_datum(test_repository, test_perf_signature):
# generate enough data for a proper alert to be generated (with enough
# extra data on both sides to make sure we're using the proper values
# to generate the actual alert)
for (push_id, job_id, value) in zip([1] * 30 + [2] * 30,
range(1, 61),
[1] * 30 + [2] * 30):
# push_id == result_set_id == timestamp for purposes of this test
push = Push.objects.get(id=push_id)
PerformanceDatum.objects.create(repository=test_repository,
result_set_id=push_id,
push_id=push_id,
signature=test_perf_signature,
value=value,
push_timestamp=push.time)