зеркало из https://github.com/mozilla/treeherder.git
Bug 1311185 - Turn off remaining result set ingestion and use
This commit is contained in:
Родитель
5c3344f708
Коммит
1c0c32cec0
|
@ -190,24 +190,6 @@ def result_set_stored(jm, sample_resultset):
|
|||
return sample_resultset
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def mock_get_resultset(monkeypatch, result_set_stored):
|
||||
from treeherder.etl import common
|
||||
|
||||
def _get_resultset(params):
|
||||
for k in params:
|
||||
rev = params[k][0]
|
||||
params[k] = {
|
||||
rev: {
|
||||
'id': 1,
|
||||
'revision': result_set_stored[0]['revision']
|
||||
}
|
||||
}
|
||||
return params
|
||||
|
||||
monkeypatch.setattr(common, 'lookup_revisions', _get_resultset)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_message_broker(monkeypatch):
|
||||
from django.conf import settings
|
||||
|
|
|
@ -87,9 +87,9 @@ def mock_buildapi_builds4h_missing_branch_url(activate_responses):
|
|||
|
||||
|
||||
def test_ingest_pending_jobs(jm,
|
||||
result_set_stored,
|
||||
mock_buildapi_pending_url,
|
||||
mock_log_parser,
|
||||
mock_get_resultset):
|
||||
mock_log_parser):
|
||||
"""
|
||||
a new buildapi pending job creates a new obj in the job table
|
||||
"""
|
||||
|
@ -107,9 +107,9 @@ def test_ingest_pending_jobs(jm,
|
|||
|
||||
|
||||
def test_ingest_running_jobs(jm,
|
||||
result_set_stored,
|
||||
mock_buildapi_running_url,
|
||||
mock_log_parser,
|
||||
mock_get_resultset):
|
||||
mock_log_parser):
|
||||
"""
|
||||
a new buildapi running job creates a new obj in the job table
|
||||
"""
|
||||
|
@ -127,9 +127,9 @@ def test_ingest_running_jobs(jm,
|
|||
|
||||
|
||||
def test_ingest_builds4h_jobs(jm,
|
||||
result_set_stored,
|
||||
mock_buildapi_builds4h_url,
|
||||
mock_log_parser,
|
||||
mock_get_resultset):
|
||||
mock_log_parser):
|
||||
"""
|
||||
a new buildapi completed job creates a new obj in the job table
|
||||
"""
|
||||
|
@ -147,10 +147,10 @@ def test_ingest_builds4h_jobs(jm,
|
|||
|
||||
|
||||
def test_ingest_running_to_complete_job(jm,
|
||||
result_set_stored,
|
||||
mock_buildapi_running_url,
|
||||
mock_buildapi_builds4h_url,
|
||||
mock_log_parser,
|
||||
mock_get_resultset):
|
||||
mock_log_parser):
|
||||
"""
|
||||
a new buildapi running job transitions to a new completed job
|
||||
|
||||
|
@ -178,9 +178,9 @@ def test_ingest_running_to_complete_job(jm,
|
|||
|
||||
|
||||
def test_ingest_running_job_fields(jm,
|
||||
result_set_stored,
|
||||
mock_buildapi_running_url,
|
||||
mock_log_parser,
|
||||
mock_get_resultset):
|
||||
mock_log_parser):
|
||||
"""
|
||||
a new buildapi running job creates a new obj in the job table
|
||||
"""
|
||||
|
@ -194,8 +194,9 @@ def test_ingest_running_job_fields(jm,
|
|||
|
||||
|
||||
def test_ingest_builds4h_jobs_1_missing_resultset(jm,
|
||||
sample_resultset, mock_buildapi_builds4h_missing1_url,
|
||||
mock_log_parser, mock_get_resultset):
|
||||
result_set_stored,
|
||||
mock_buildapi_builds4h_missing1_url,
|
||||
mock_log_parser):
|
||||
"""
|
||||
Ensure the builds4h job with the missing resultset is not ingested
|
||||
"""
|
||||
|
@ -207,8 +208,9 @@ def test_ingest_builds4h_jobs_1_missing_resultset(jm,
|
|||
|
||||
|
||||
def test_ingest_builds4h_jobs_missing_branch(jm,
|
||||
sample_resultset, mock_buildapi_builds4h_missing_branch_url,
|
||||
mock_log_parser, mock_get_resultset):
|
||||
result_set_stored,
|
||||
mock_buildapi_builds4h_missing_branch_url,
|
||||
mock_log_parser):
|
||||
"""
|
||||
Ensure the builds4h job with the missing branch is not ingested
|
||||
"""
|
||||
|
|
|
@ -3,12 +3,12 @@ import copy
|
|||
import pytest
|
||||
|
||||
from treeherder.etl.job_loader import (JobLoader,
|
||||
MissingResultsetException)
|
||||
from treeherder.model.derived import DatasetNotFoundError
|
||||
MissingPushException)
|
||||
from treeherder.model.models import (Job,
|
||||
JobDetail,
|
||||
JobLog,
|
||||
Push)
|
||||
Push,
|
||||
Repository)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -88,7 +88,7 @@ def test_ingest_pulse_jobs_bad_project(pulse_jobs, test_project, jm, result_set_
|
|||
job["origin"]["revision"] = revision
|
||||
job["origin"]["project"] = "ferd"
|
||||
|
||||
with pytest.raises(DatasetNotFoundError):
|
||||
with pytest.raises(Repository.DoesNotExist):
|
||||
jl.process_job_list(pulse_jobs)
|
||||
|
||||
|
||||
|
@ -121,7 +121,7 @@ def test_ingest_pulse_jobs_with_missing_resultset(pulse_jobs):
|
|||
job = pulse_jobs[0]
|
||||
job["origin"]["revision"] = "1234567890123456789012345678901234567890"
|
||||
|
||||
with pytest.raises(MissingResultsetException):
|
||||
with pytest.raises(MissingPushException):
|
||||
jl.process_job_list(pulse_jobs)
|
||||
|
||||
# if one job isn't ready, except on the whole batch. They'll retry as a
|
||||
|
|
|
@ -6,7 +6,6 @@ import time
|
|||
import pytest
|
||||
|
||||
from treeherder.etl.perf import load_perf_artifacts
|
||||
from treeherder.model.derived import JobsModel
|
||||
from treeherder.model.models import (MachinePlatform,
|
||||
Option,
|
||||
OptionCollection,
|
||||
|
@ -36,21 +35,12 @@ def perf_platform():
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def perf_push(jm):
|
||||
jm.store_result_set_data([{
|
||||
'revision': '1234abcd',
|
||||
'push_timestamp': int(time.time()),
|
||||
'author': 'foo@bar.com',
|
||||
'revisions': []
|
||||
}])
|
||||
return Push.objects.get(id=1)
|
||||
# FIXME: Delete above and switch to this when we've finished
|
||||
# migrating away from resultsets
|
||||
# return Push.objects.create(
|
||||
# repository=test_repository,
|
||||
# revision='1234abcd',
|
||||
# author='foo@bar.com',
|
||||
# time=datetime.datetime.now())
|
||||
def perf_push(test_repository):
|
||||
return Push.objects.create(
|
||||
repository=test_repository,
|
||||
revision='1234abcd',
|
||||
author='foo@bar.com',
|
||||
time=datetime.datetime.now())
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -58,8 +48,7 @@ def perf_job_data(perf_push):
|
|||
return {
|
||||
'fake_job_guid': {
|
||||
'id': 1,
|
||||
'push_id': 1,
|
||||
'result_set_id': 1
|
||||
'push_id': 1
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -90,24 +79,14 @@ def _generate_perf_data_range(test_project, test_repository,
|
|||
now = int(time.time())
|
||||
|
||||
for (i, value) in zip(range(30), [1]*15 + [2]*15):
|
||||
with JobsModel(test_repository.name) as jm:
|
||||
jm.store_result_set_data([{
|
||||
'revision': 'abcdefgh%s' % i,
|
||||
'push_timestamp': now + i,
|
||||
'author': 'foo@bar.com',
|
||||
'revisions': []
|
||||
}])
|
||||
# FIXME: delete above and switch to this when we're no longer using
|
||||
# result sets
|
||||
# push = Push.objects.create(repository=test_repository,
|
||||
# revision='abcdefgh%s' % i,
|
||||
# author='foo@bar.com',
|
||||
# time=datetime.datetime.fromtimestamp(now+i))
|
||||
Push.objects.create(repository=test_repository,
|
||||
revision='abcdefgh%s' % i,
|
||||
author='foo@bar.com',
|
||||
time=datetime.datetime.fromtimestamp(now+i))
|
||||
perf_job_data = {
|
||||
'fake_job_guid': {
|
||||
'id': i,
|
||||
'push_id': i + 1,
|
||||
'result_set_id': i + 1,
|
||||
'push_timestamp': now + i
|
||||
}
|
||||
}
|
||||
|
@ -292,23 +271,19 @@ def test_load_generic_data(test_project, test_repository,
|
|||
|
||||
# send another datum, a little later, verify that signature's
|
||||
# `last_updated` is changed accordingly
|
||||
# FIXME: again, we should switch to just creating a Push object when we can
|
||||
later_timestamp = int(time.time()) + 5
|
||||
jm.store_result_set_data([{
|
||||
'revision': '1234abcd12',
|
||||
'push_timestamp': later_timestamp,
|
||||
'author': 'foo@bar.com',
|
||||
'revisions': []
|
||||
}])
|
||||
perf_job_data['fake_job_guid']['push_id'] += 1
|
||||
perf_job_data['fake_job_guid']['result_set_id'] += 1
|
||||
later_timestamp = datetime.datetime.fromtimestamp(int(time.time()) + 5)
|
||||
push = Push.objects.create(
|
||||
repository=test_repository,
|
||||
revision='1234abcd12',
|
||||
author='foo@bar.com',
|
||||
time=later_timestamp)
|
||||
perf_job_data['fake_job_guid']['push_id'] = push.id
|
||||
load_perf_artifacts(test_repository.name, perf_reference_data,
|
||||
perf_job_data, submit_datum)
|
||||
signature = PerformanceSignature.objects.get(
|
||||
suite=perf_datum['suites'][0]['name'],
|
||||
test=perf_datum['suites'][0]['subtests'][0]['name'])
|
||||
assert (signature.last_updated ==
|
||||
datetime.datetime.utcfromtimestamp(later_timestamp))
|
||||
assert signature.last_updated == later_timestamp
|
||||
|
||||
|
||||
def test_no_performance_framework(test_project, test_repository,
|
||||
|
@ -462,9 +437,9 @@ def test_alert_generation(test_project, test_repository,
|
|||
if expected_num_alerts > 0:
|
||||
assert 1 == PerformanceAlertSummary.objects.all().count()
|
||||
summary = PerformanceAlertSummary.objects.get(id=1)
|
||||
assert summary.result_set_id == 16
|
||||
assert summary.result_set_id is None
|
||||
assert summary.push_id == 16
|
||||
assert summary.prev_result_set_id == 15
|
||||
assert summary.prev_result_set_id is None
|
||||
assert summary.prev_push_id == 15
|
||||
else:
|
||||
assert 0 == PerformanceAlertSummary.objects.all().count()
|
||||
|
|
|
@ -6,6 +6,8 @@ from django.conf import settings
|
|||
from django.core.cache import cache
|
||||
|
||||
from treeherder.etl.pushlog import HgPushlogProcess
|
||||
from treeherder.model.models import (Commit,
|
||||
Push)
|
||||
|
||||
|
||||
def test_ingest_hg_pushlog(jm, test_base_dir,
|
||||
|
@ -17,7 +19,6 @@ def test_ingest_hg_pushlog(jm, test_base_dir,
|
|||
with open(pushlog_path) as f:
|
||||
pushlog_content = f.read()
|
||||
pushlog_fake_url = "http://www.thisismypushlog.com"
|
||||
push_num = 10
|
||||
responses.add(responses.GET, pushlog_fake_url,
|
||||
body=pushlog_content, status=200,
|
||||
content_type='application/json')
|
||||
|
@ -26,19 +27,9 @@ def test_ingest_hg_pushlog(jm, test_base_dir,
|
|||
|
||||
process.run(pushlog_fake_url, jm.project)
|
||||
|
||||
pushes_stored = jm.get_dhub().execute(
|
||||
proc="jobs_test.selects.result_set_ids",
|
||||
return_type='tuple'
|
||||
)
|
||||
|
||||
assert len(pushes_stored) == push_num
|
||||
|
||||
revisions_stored = jm.get_dhub().execute(
|
||||
proc="jobs_test.selects.revision_ids",
|
||||
return_type='tuple'
|
||||
)
|
||||
|
||||
assert len(revisions_stored) == 15
|
||||
# should be 10 pushes, 15 revisions
|
||||
assert Push.objects.count() == 10
|
||||
assert Commit.objects.count() == 15
|
||||
|
||||
|
||||
def test_ingest_hg_pushlog_already_stored(jm, test_base_dir,
|
||||
|
@ -66,12 +57,7 @@ def test_ingest_hg_pushlog_already_stored(jm, test_base_dir,
|
|||
process = HgPushlogProcess()
|
||||
process.run(pushlog_fake_url, jm.project)
|
||||
|
||||
pushes_stored = jm.get_dhub().execute(
|
||||
proc="jobs_test.selects.result_set_ids",
|
||||
return_type='tuple'
|
||||
)
|
||||
|
||||
assert len(pushes_stored) == 1
|
||||
assert Push.objects.count() == 1
|
||||
|
||||
# store both first and second push
|
||||
first_and_second_push_json = json.dumps(
|
||||
|
@ -89,12 +75,7 @@ def test_ingest_hg_pushlog_already_stored(jm, test_base_dir,
|
|||
|
||||
process.run(pushlog_fake_url, jm.project)
|
||||
|
||||
pushes_stored = jm.get_dhub().execute(
|
||||
proc="jobs_test.selects.result_set_ids",
|
||||
return_type='tuple'
|
||||
)
|
||||
|
||||
assert len(pushes_stored) == 2
|
||||
assert Push.objects.count() == 2
|
||||
|
||||
|
||||
def test_ingest_hg_pushlog_cache_last_push(jm, test_repository,
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import copy
|
||||
import datetime
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
@ -11,7 +12,8 @@ from tests.autoclassify.utils import (create_failure_lines,
|
|||
from tests.sample_data_generator import (job_data,
|
||||
result_set)
|
||||
from treeherder.model.derived import ArtifactsModel
|
||||
from treeherder.model.models import (ExclusionProfile,
|
||||
from treeherder.model.models import (Commit,
|
||||
ExclusionProfile,
|
||||
FailureLine,
|
||||
Job,
|
||||
JobDetail,
|
||||
|
@ -82,29 +84,26 @@ def test_ingest_twice_log_parsing_status_changed(jm, sample_data,
|
|||
job_log.status == JobLog.FAILED
|
||||
|
||||
|
||||
def test_get_inserted_row_ids(jm, sample_resultset, test_repository):
|
||||
def test_insert_result_sets(jm, sample_resultset, test_repository):
|
||||
|
||||
slice_limit = 8
|
||||
sample_slice = sample_resultset[0:slice_limit]
|
||||
new_id_set = set(range(1, len(sample_slice) + 1))
|
||||
|
||||
data = jm.store_result_set_data(sample_slice)
|
||||
jm.store_result_set_data(sample_slice)
|
||||
|
||||
# Confirm the range of ids matches for the sample_resultset slice
|
||||
assert set(data['inserted_result_set_ids']) == new_id_set
|
||||
assert Push.objects.count() == len(sample_slice)
|
||||
|
||||
second_pass_data = jm.store_result_set_data(sample_slice)
|
||||
jm.store_result_set_data(sample_slice)
|
||||
|
||||
# Confirm if we store the same data twice we don't identify new
|
||||
# result set ids
|
||||
assert second_pass_data['inserted_result_set_ids'] == []
|
||||
assert Push.objects.count() == len(sample_slice)
|
||||
|
||||
third_pass_data = jm.store_result_set_data(sample_resultset)
|
||||
jm.store_result_set_data(sample_resultset)
|
||||
|
||||
# Confirm if we store a mix of new result sets and already stored
|
||||
# result sets we store/identify the new ones
|
||||
assert len(set(third_pass_data['inserted_result_set_ids'])) == \
|
||||
len(sample_resultset) - slice_limit
|
||||
assert Push.objects.count() == len(sample_resultset)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("same_ingestion_cycle", [False, True])
|
||||
|
@ -537,49 +536,37 @@ def test_bad_date_value_ingestion(jm, test_repository, mock_log_parser):
|
|||
# if no exception, we are good.
|
||||
|
||||
|
||||
def test_store_result_set_data(jm, sample_resultset):
|
||||
def test_store_result_set_data(jm, test_repository, sample_resultset):
|
||||
|
||||
data = jm.store_result_set_data(sample_resultset)
|
||||
|
||||
result_set_ids = jm.get_dhub().execute(
|
||||
proc="jobs_test.selects.result_set_ids",
|
||||
key_column='long_revision',
|
||||
return_type='dict'
|
||||
)
|
||||
revision_ids = jm.get_dhub().execute(
|
||||
proc="jobs_test.selects.revision_ids",
|
||||
key_column='revision',
|
||||
return_type='dict'
|
||||
)
|
||||
|
||||
rs_revisions = set()
|
||||
revisions = set()
|
||||
|
||||
for datum in sample_resultset:
|
||||
rs_revisions.add(datum['revision'])
|
||||
for revision in datum['revisions']:
|
||||
revisions.add(revision['revision'])
|
||||
jm.store_result_set_data(sample_resultset)
|
||||
|
||||
# Confirm all of the pushes and revisions in the
|
||||
# sample_resultset have been stored
|
||||
assert {r for r in data['result_set_ids'].keys() if len(r) == 40} == rs_revisions
|
||||
assert set(data['revision_ids'].keys()) == revisions
|
||||
exp_push_revisions = set()
|
||||
exp_commit_revisions = set()
|
||||
for rs in sample_resultset:
|
||||
exp_push_revisions.add(rs['revision'])
|
||||
for rs_revision in rs['revisions']:
|
||||
exp_commit_revisions.add(rs_revision['revision'])
|
||||
|
||||
assert set(Push.objects.values_list('revision', flat=True)) == exp_push_revisions
|
||||
assert set(Commit.objects.values_list('revision', flat=True)) == exp_commit_revisions
|
||||
|
||||
# Confirm the data structures returned match what's stored in
|
||||
# the database
|
||||
for rev in rs_revisions:
|
||||
assert data['result_set_ids'][rev] == result_set_ids[rev]
|
||||
|
||||
assert data['revision_ids'] == revision_ids
|
||||
|
||||
|
||||
def test_store_result_set_revisions(jm, sample_resultset):
|
||||
"""Test that the ``top`` revision stored for resultset is correct"""
|
||||
resultsets = sample_resultset[8:9]
|
||||
jm.store_result_set_data(resultsets)
|
||||
stored = jm.get_dhub().execute(proc="jobs_test.selects.result_sets")[0]
|
||||
assert stored["long_revision"] == "997b28cb87373456789012345678901234567890"
|
||||
assert stored["short_revision"] == "997b28cb8737"
|
||||
for rs in sample_resultset:
|
||||
push = Push.objects.get(
|
||||
repository=test_repository,
|
||||
revision=rs['revision'],
|
||||
author=rs['author'],
|
||||
revision_hash=rs.get('revision_hash', rs['revision']),
|
||||
time=datetime.datetime.fromtimestamp(rs['push_timestamp']))
|
||||
for commit in rs['revisions']:
|
||||
assert Commit.objects.get(
|
||||
push=push,
|
||||
revision=commit['revision'],
|
||||
author=commit['author'],
|
||||
comments=commit['comment'])
|
||||
|
||||
|
||||
def test_get_job_data(jm, test_project, sample_data,
|
||||
|
|
|
@ -9,8 +9,7 @@ from treeherder.perf.models import (PerformanceAlert,
|
|||
PerformanceDatum)
|
||||
|
||||
|
||||
def _verify_alert(alertid, expected_result_set_id,
|
||||
expected_prev_result_set_id,
|
||||
def _verify_alert(alertid, expected_push_id, expected_prev_push_id,
|
||||
expected_signature, expected_prev_value,
|
||||
expected_new_value, expected_is_regression,
|
||||
expected_status, expected_summary_status,
|
||||
|
@ -24,39 +23,33 @@ def _verify_alert(alertid, expected_result_set_id,
|
|||
assert alert.classifier == expected_classifier
|
||||
|
||||
summary = PerformanceAlertSummary.objects.get(id=alertid)
|
||||
assert summary.result_set_id == expected_result_set_id
|
||||
assert summary.prev_result_set_id == expected_prev_result_set_id
|
||||
assert summary.result_set_id is None
|
||||
assert summary.prev_result_set_id is None
|
||||
assert summary.push_id == expected_push_id
|
||||
assert summary.prev_push_id == expected_prev_push_id
|
||||
assert summary.status == expected_summary_status
|
||||
|
||||
|
||||
def _generate_performance_data(test_repository, test_perf_signature, jm,
|
||||
def _generate_performance_data(test_repository, test_perf_signature,
|
||||
base_timestamp, start_id, value, amount):
|
||||
for (t, v) in zip([i for i in range(start_id, start_id + amount)],
|
||||
[value for i in range(start_id, start_id + amount)]):
|
||||
revision = '1234abcd%s' % t
|
||||
jm.store_result_set_data([{
|
||||
'revision': revision,
|
||||
'push_timestamp': int(base_timestamp + t),
|
||||
'author': 'foo@bar.com',
|
||||
'revisions': []
|
||||
}])
|
||||
push, _ = Push.objects.get_or_create(
|
||||
repository=test_repository,
|
||||
revision='1234abcd%s' % t,
|
||||
defaults={
|
||||
'author': 'foo@bar.com',
|
||||
'time': datetime.datetime.fromtimestamp(base_timestamp + t)
|
||||
})
|
||||
job = Job.objects.create(
|
||||
repository=test_repository,
|
||||
guid='abcd%s' % Job.objects.count(),
|
||||
project_specific_id=Job.objects.count(),
|
||||
push=Push.objects.get(repository=test_repository,
|
||||
revision=revision))
|
||||
# FIXME: Delete above and switch to this when we've finished
|
||||
# migrating away from resultsets
|
||||
# return Push.objects.create(
|
||||
# repository=test_repository,
|
||||
# revision='1234abcd',
|
||||
# author='foo@bar.com',
|
||||
# time=datetime.datetime.fromtimestamp(base_timestamp + t))
|
||||
push=push)
|
||||
PerformanceDatum.objects.create(
|
||||
repository=test_repository,
|
||||
result_set_id=t,
|
||||
push_id=t,
|
||||
push=push,
|
||||
job_id=job.id,
|
||||
signature=test_perf_signature,
|
||||
push_timestamp=datetime.datetime.utcfromtimestamp(
|
||||
|
@ -65,13 +58,13 @@ def _generate_performance_data(test_repository, test_perf_signature, jm,
|
|||
|
||||
|
||||
def test_detect_alerts_in_series(test_project, test_repository,
|
||||
test_perf_signature, jm):
|
||||
test_perf_signature):
|
||||
|
||||
base_time = time.time() # generate it based off current time
|
||||
INTERVAL = 30
|
||||
_generate_performance_data(test_repository, test_perf_signature, jm,
|
||||
_generate_performance_data(test_repository, test_perf_signature,
|
||||
base_time, 1, 0.5, INTERVAL/2)
|
||||
_generate_performance_data(test_repository, test_perf_signature, jm,
|
||||
_generate_performance_data(test_repository, test_perf_signature,
|
||||
base_time, (INTERVAL/2) + 1, 1.0, INTERVAL/2)
|
||||
|
||||
generate_new_alerts_in_series(test_perf_signature)
|
||||
|
@ -91,7 +84,7 @@ def test_detect_alerts_in_series(test_project, test_repository,
|
|||
PerformanceAlertSummary.UNTRIAGED, None)
|
||||
|
||||
# add data that should be enough to generate a new alert if we rerun
|
||||
_generate_performance_data(test_repository, test_perf_signature, jm,
|
||||
_generate_performance_data(test_repository, test_perf_signature,
|
||||
base_time, (INTERVAL+1), 2.0, INTERVAL)
|
||||
generate_new_alerts_in_series(test_perf_signature)
|
||||
|
||||
|
@ -103,22 +96,22 @@ def test_detect_alerts_in_series(test_project, test_repository,
|
|||
|
||||
|
||||
def test_detect_alerts_in_series_with_retriggers(
|
||||
test_project, test_repository, test_perf_signature, jm):
|
||||
test_project, test_repository, test_perf_signature):
|
||||
|
||||
# sometimes we detect an alert in the middle of a series
|
||||
# where there are retriggers, make sure we handle this case
|
||||
# gracefully by generating a sequence where the regression
|
||||
# "appears" in the middle of a series with the same resultset
|
||||
# "appears" in the middle of a series with the same push
|
||||
# to make sure things are calculated correctly
|
||||
base_time = time.time() # generate it based off current time
|
||||
for i in range(30):
|
||||
_generate_performance_data(test_repository, test_perf_signature, jm,
|
||||
_generate_performance_data(test_repository, test_perf_signature,
|
||||
base_time, 1, 0.5, 1)
|
||||
for i in range(20):
|
||||
_generate_performance_data(test_repository, test_perf_signature, jm,
|
||||
_generate_performance_data(test_repository, test_perf_signature,
|
||||
base_time, 2, 0.5, 1)
|
||||
for i in range(40):
|
||||
_generate_performance_data(test_repository, test_perf_signature, jm,
|
||||
_generate_performance_data(test_repository, test_perf_signature,
|
||||
base_time, 2, 1.0, 1)
|
||||
|
||||
generate_new_alerts_in_series(test_perf_signature)
|
||||
|
@ -128,12 +121,12 @@ def test_detect_alerts_in_series_with_retriggers(
|
|||
|
||||
|
||||
def test_no_alerts_with_old_data(
|
||||
test_project, test_repository, test_perf_signature, jm):
|
||||
test_project, test_repository, test_perf_signature):
|
||||
base_time = 0 # 1970, too old!
|
||||
INTERVAL = 30
|
||||
_generate_performance_data(test_repository, test_perf_signature, jm,
|
||||
_generate_performance_data(test_repository, test_perf_signature,
|
||||
base_time, 1, 0.5, INTERVAL/2)
|
||||
_generate_performance_data(test_repository, test_perf_signature, jm,
|
||||
_generate_performance_data(test_repository, test_perf_signature,
|
||||
base_time, (INTERVAL/2) + 1, 1.0, INTERVAL/2)
|
||||
|
||||
generate_new_alerts_in_series(test_perf_signature)
|
||||
|
@ -153,11 +146,11 @@ def test_custom_alert_threshold(
|
|||
# of 200% that should only generate 1
|
||||
INTERVAL = 60
|
||||
base_time = time.time()
|
||||
_generate_performance_data(test_repository, test_perf_signature, jm,
|
||||
_generate_performance_data(test_repository, test_perf_signature,
|
||||
base_time, 1, 0.5, INTERVAL/3)
|
||||
_generate_performance_data(test_repository, test_perf_signature, jm,
|
||||
_generate_performance_data(test_repository, test_perf_signature,
|
||||
base_time, (INTERVAL/3) + 1, 0.6, INTERVAL/3)
|
||||
_generate_performance_data(test_repository, test_perf_signature, jm,
|
||||
_generate_performance_data(test_repository, test_perf_signature,
|
||||
base_time, 2*(INTERVAL/3) + 1, 2.0, INTERVAL/3)
|
||||
|
||||
generate_new_alerts_in_series(test_perf_signature)
|
||||
|
|
|
@ -168,17 +168,9 @@ def verify_products(products_ref):
|
|||
|
||||
def verify_result_sets(jm, result_sets_ref):
|
||||
|
||||
assert result_sets_ref.issubset(models.Push.objects.values_list(
|
||||
return result_sets_ref.issubset(models.Push.objects.values_list(
|
||||
'revision', flat=True))
|
||||
|
||||
ds_revisions = jm.get_dhub().execute(
|
||||
proc='jobs.selects.get_all_result_set_revisions',
|
||||
key_column='long_revision',
|
||||
return_type='set'
|
||||
)
|
||||
|
||||
assert result_sets_ref.issubset(ds_revisions)
|
||||
|
||||
|
||||
def verify_log_urls(jm, log_urls_ref):
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ from threading import local
|
|||
|
||||
import pytest
|
||||
|
||||
from treeherder.etl.job_loader import MissingResultsetException
|
||||
from treeherder.etl.job_loader import MissingPushException
|
||||
from treeherder.etl.tasks.pulse_tasks import store_pulse_jobs
|
||||
from treeherder.model.models import Job
|
||||
|
||||
|
@ -25,7 +25,7 @@ def test_retry_missing_revision_succeeds(sample_data, sample_resultset,
|
|||
orig_retry = store_pulse_jobs.retry
|
||||
|
||||
def retry_mock(exc=None, countdown=None):
|
||||
assert isinstance(exc, MissingResultsetException)
|
||||
assert isinstance(exc, MissingPushException)
|
||||
thread_data.retries += 1
|
||||
jm.store_result_set_data([rs])
|
||||
return orig_retry(exc=exc, countdown=countdown)
|
||||
|
@ -47,7 +47,7 @@ def test_retry_missing_revision_never_succeeds(sample_data, test_project,
|
|||
job = sample_data.pulse_jobs[0]
|
||||
job["origin"]["project"] = test_project
|
||||
|
||||
with pytest.raises(MissingResultsetException):
|
||||
with pytest.raises(MissingPushException):
|
||||
store_pulse_jobs.delay(job, "foo", "bar")
|
||||
|
||||
assert Job.objects.count() == 0
|
||||
|
|
|
@ -11,7 +11,8 @@ from treeherder.client import TreeherderJobCollection
|
|||
from treeherder.etl import (buildbot,
|
||||
common)
|
||||
from treeherder.model.derived.jobs import JobsModel
|
||||
from treeherder.model.models import Datasource
|
||||
from treeherder.model.models import (Datasource,
|
||||
Push)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
CACHE_KEYS = {
|
||||
|
@ -83,8 +84,6 @@ class Builds4hTransformerMixin(object):
|
|||
transform the builds4h structure into something we can ingest via
|
||||
our restful api
|
||||
"""
|
||||
revisions = defaultdict(list)
|
||||
|
||||
valid_projects = set(x.project for x in Datasource.objects.cached())
|
||||
|
||||
for build in data['builds']:
|
||||
|
@ -102,12 +101,9 @@ class Builds4hTransformerMixin(object):
|
|||
logger.warning("skipping builds-4hr job %s since missing property: %s", build['id'], str(e))
|
||||
continue
|
||||
|
||||
revisions[project].append(prop['revision'])
|
||||
|
||||
revisions_lookup = common.lookup_revisions(revisions)
|
||||
|
||||
job_ids_seen_last_time = cache.get(CACHE_KEYS['complete'], set())
|
||||
job_ids_seen_now = set()
|
||||
revisions_seen_for_project = defaultdict(set)
|
||||
|
||||
# Holds one collection per unique branch/project
|
||||
th_collections = {}
|
||||
|
@ -124,11 +120,17 @@ class Builds4hTransformerMixin(object):
|
|||
except KeyError:
|
||||
continue
|
||||
|
||||
try:
|
||||
resultset = revisions_lookup[project][prop['revision']]
|
||||
except KeyError:
|
||||
logger.warning("skipping builds-4hr job %s since %s revision %s not yet ingested", build['id'], project, prop['revision'])
|
||||
# it should be quite rare for a job to be ingested before a
|
||||
# revision, but it could happen
|
||||
revision = prop['revision']
|
||||
if (revision not in revisions_seen_for_project[project] and
|
||||
not Push.objects.filter(
|
||||
repository__name=project,
|
||||
revision__startswith=revision).exists()):
|
||||
logger.warning("skipping jobs since %s revision %s "
|
||||
"not yet ingested", project, revision)
|
||||
continue
|
||||
revisions_seen_for_project[project].add(revision)
|
||||
|
||||
# We record the id here rather than at the start of the loop, since we
|
||||
# must not count jobs whose revisions were not yet imported as processed,
|
||||
|
@ -149,7 +151,6 @@ class Builds4hTransformerMixin(object):
|
|||
|
||||
treeherder_data = {
|
||||
'revision': prop['revision'],
|
||||
'resultset_id': resultset['id'],
|
||||
'project': project,
|
||||
'coalesced': []
|
||||
}
|
||||
|
@ -272,9 +273,6 @@ class PendingRunningTransformerMixin(object):
|
|||
continue
|
||||
revision_dict[project].append(rev)
|
||||
|
||||
# retrieving the revision->resultset lookups
|
||||
revisions_lookup = common.lookup_revisions(revision_dict)
|
||||
|
||||
job_ids_seen_last_time = cache.get(CACHE_KEYS[source], set())
|
||||
job_ids_seen_now = set()
|
||||
|
||||
|
@ -284,15 +282,21 @@ class PendingRunningTransformerMixin(object):
|
|||
if common.should_skip_project(project, valid_projects, project_filter):
|
||||
continue
|
||||
|
||||
revisions_seen_now_for_project = set()
|
||||
|
||||
for revision, jobs in revisions.items():
|
||||
if common.should_skip_revision(revision, revision_filter):
|
||||
continue
|
||||
|
||||
try:
|
||||
resultset = revisions_lookup[project][revision]
|
||||
except KeyError:
|
||||
logger.warning("skipping jobs since %s revision %s not yet ingested", project, revision)
|
||||
# it should be quite rare for a job to be ingested before a
|
||||
# revision, but it could happen
|
||||
if revision not in revisions_seen_now_for_project and \
|
||||
not Push.objects.filter(repository__name=project,
|
||||
revision__startswith=revision).exists():
|
||||
logger.warning("skipping jobs since %s revision %s "
|
||||
"not yet ingested", project, revision)
|
||||
continue
|
||||
revisions_seen_now_for_project.add(revision)
|
||||
|
||||
# using project and revision form the revision lookups
|
||||
# to filter those jobs with unmatched revision
|
||||
|
@ -306,7 +310,6 @@ class PendingRunningTransformerMixin(object):
|
|||
|
||||
treeherder_data = {
|
||||
'revision': revision,
|
||||
'resultset_id': resultset['id'],
|
||||
'project': project,
|
||||
}
|
||||
|
||||
|
|
|
@ -54,24 +54,6 @@ def fetch_text(url):
|
|||
return response.text
|
||||
|
||||
|
||||
def lookup_revisions(revision_dict):
|
||||
"""
|
||||
Retrieve a list of revision->resultset lookups
|
||||
"""
|
||||
from treeherder.model.derived import JobsModel
|
||||
|
||||
lookup = dict()
|
||||
for project, revisions in revision_dict.items():
|
||||
revision_list = list(set(revisions))
|
||||
|
||||
with JobsModel(project) as jm:
|
||||
lookup_content = jm.get_resultset_all_revision_lookup(revision_list)
|
||||
|
||||
if lookup_content:
|
||||
lookup[project] = lookup_content
|
||||
return lookup
|
||||
|
||||
|
||||
def should_skip_project(project, valid_projects, project_filter):
|
||||
if project_filter and project != project_filter:
|
||||
return True
|
||||
|
|
|
@ -6,8 +6,9 @@ import newrelic.agent
|
|||
|
||||
from treeherder.etl.common import to_timestamp
|
||||
from treeherder.etl.schema import job_json_schema
|
||||
from treeherder.model.derived import DatasetNotFoundError
|
||||
from treeherder.model.derived.jobs import JobsModel
|
||||
from treeherder.model.models import (Push,
|
||||
Repository)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -60,32 +61,34 @@ class JobLoader:
|
|||
logger.warn("Skipping job due to bad attribute",
|
||||
exc_info=1)
|
||||
|
||||
try:
|
||||
jobs_model.store_job_data(storeable_job_list)
|
||||
except DatasetNotFoundError:
|
||||
logger.warn("Job with unsupported project: {}".format(project))
|
||||
jobs_model.store_job_data(storeable_job_list)
|
||||
|
||||
def clean_revision(self, pulse_job, jobs_model):
|
||||
# get the repository first (we'll throw an exception if it doesn't
|
||||
# exist)
|
||||
repository = Repository.objects.get(
|
||||
name=pulse_job["origin"]["project"])
|
||||
|
||||
# It is possible there will be either a revision or a revision_hash
|
||||
# At some point we will ONLY get revisions and no longer receive
|
||||
# revision_hashes and then this check can be removed.
|
||||
revision = pulse_job["origin"].get("revision", None)
|
||||
revision = pulse_job["origin"].get("revision")
|
||||
if revision:
|
||||
# will raise an exception if repository with name does not
|
||||
# exist (which we want, I think, to draw attention to the problem)
|
||||
# check the revision for this job has an existing resultset
|
||||
# If it doesn't, then except out so that the celery task will
|
||||
# retry till it DOES exist.
|
||||
if not jobs_model.get_resultset_top_revision_lookup([revision]):
|
||||
raise MissingResultsetException(
|
||||
"No resultset found in {} for revision {}".format(
|
||||
if not Push.objects.filter(repository=repository,
|
||||
revision__startswith=revision).exists():
|
||||
raise MissingPushException(
|
||||
"No push found in {} for revision {}".format(
|
||||
pulse_job["origin"]["project"],
|
||||
revision))
|
||||
|
||||
else:
|
||||
# This will also raise a ValueError if the resultset for the
|
||||
# revision_hash is not found.
|
||||
revision = jobs_model.get_revision_from_revision_hash(
|
||||
pulse_job["origin"]["revision_hash"]
|
||||
)
|
||||
revision = Push.objects.values_list('revision', flat=True).get(
|
||||
repository=repository,
|
||||
revision_hash=pulse_job["origin"]["revision_hash"])
|
||||
logger.warning(
|
||||
"Pulse job had revision_hash instead of revision: {}:{}".format(
|
||||
pulse_job["origin"]["project"],
|
||||
|
@ -320,5 +323,5 @@ class JobLoader:
|
|||
return validated_jobs
|
||||
|
||||
|
||||
class MissingResultsetException(Exception):
|
||||
class MissingPushException(Exception):
|
||||
pass
|
||||
|
|
|
@ -83,7 +83,6 @@ def _load_perf_artifact(project_name, reference_data, job_data, job_guid,
|
|||
|
||||
# data for performance series
|
||||
job_id = job_data[job_guid]['id']
|
||||
result_set_id = job_data[job_guid]['result_set_id']
|
||||
push = Push.objects.get(id=job_data[job_guid]['push_id'])
|
||||
|
||||
try:
|
||||
|
@ -140,7 +139,6 @@ def _load_perf_artifact(project_name, reference_data, job_data, job_guid,
|
|||
})
|
||||
(_, datum_created) = PerformanceDatum.objects.get_or_create(
|
||||
repository=repository,
|
||||
result_set_id=result_set_id,
|
||||
push=push,
|
||||
job_id=job_id,
|
||||
signature=signature,
|
||||
|
@ -194,7 +192,6 @@ def _load_perf_artifact(project_name, reference_data, job_data, job_guid,
|
|||
})
|
||||
(_, datum_created) = PerformanceDatum.objects.get_or_create(
|
||||
repository=repository,
|
||||
result_set_id=result_set_id,
|
||||
push=push,
|
||||
job_id=job_id,
|
||||
signature=signature,
|
||||
|
|
|
@ -499,220 +499,6 @@ into chunks of chunk_size size. Returns the number of result sets deleted"""
|
|||
# Allow some time for other queries to get through
|
||||
time.sleep(sleep_time)
|
||||
|
||||
def _add_short_revision_lookups(self, lookup):
|
||||
short_rev_lookup = {}
|
||||
for rev, rs in lookup.iteritems():
|
||||
short_rev_lookup[rev[:12]] = rs
|
||||
lookup.update(short_rev_lookup)
|
||||
|
||||
def _get_short_and_long_revision_query_params(self, revision_list,
|
||||
short_revision_field="short_revision",
|
||||
long_revision_field="long_revision"):
|
||||
"""Build params to search for both long and short revisions."""
|
||||
|
||||
long_revision_list = [x for x in revision_list if len(x) == 40]
|
||||
short_revision_list = [x[:12] for x in revision_list]
|
||||
long_rev_list_repl = ",".join(["%s"] * len(long_revision_list))
|
||||
short_rev_list_repl = ",".join(["%s"] * len(short_revision_list))
|
||||
|
||||
# It's possible that only 12 char revisions were passed in, so the
|
||||
# ``long_revision_list`` would be zero length. If it is, then this
|
||||
# adds nothing to the where clause.
|
||||
long_revision_or = ""
|
||||
if long_revision_list:
|
||||
long_revision_or = " OR {} IN ({})".format(
|
||||
long_revision_field,
|
||||
long_rev_list_repl
|
||||
)
|
||||
|
||||
replacement = " AND ({} IN ({}) {})".format(
|
||||
short_revision_field,
|
||||
short_rev_list_repl,
|
||||
long_revision_or
|
||||
)
|
||||
placeholders = short_revision_list + long_revision_list
|
||||
|
||||
return {
|
||||
"replacement": [replacement],
|
||||
"placeholders": placeholders
|
||||
}
|
||||
|
||||
def get_resultset_all_revision_lookup(self, revision_list):
|
||||
"""
|
||||
Create a revision->resultset lookup from a list of revisions
|
||||
|
||||
This will map ALL revision/commits that are within this resultset, not
|
||||
just the top revision. It will also map both short and long revisions
|
||||
to their resultsets because users might search by either.
|
||||
|
||||
This will retrieve non-active resultsets as well. Some of the data
|
||||
ingested has mixed up revisions that show for jobs, but are not in
|
||||
the right repository in builds4hr/running/pending. So we ingest those
|
||||
bad resultsets/revisions as non-active so that we don't keep trying
|
||||
to re-ingest them. Allowing this query to retrieve non ``active``
|
||||
resultsets means we will avoid re-doing that work by detecting that
|
||||
we've already ingested it.
|
||||
|
||||
But we skip ingesting the job, because the resultset is not active.
|
||||
"""
|
||||
|
||||
if not revision_list:
|
||||
return {}
|
||||
|
||||
# Build params to search for both long and short revisions.
|
||||
params = self._get_short_and_long_revision_query_params(
|
||||
revision_list,
|
||||
"revision.short_revision",
|
||||
"revision.long_revision")
|
||||
|
||||
proc = "jobs.selects.get_resultset_all_revision_lookup"
|
||||
lookup = self.execute(
|
||||
proc=proc,
|
||||
placeholders=params["placeholders"],
|
||||
debug_show=self.DEBUG,
|
||||
replace=params["replacement"],
|
||||
return_type="dict",
|
||||
key_column="long_revision"
|
||||
)
|
||||
|
||||
# ``lookups`` will be keyed ONLY by long_revision, at this point.
|
||||
# Add the resultsets keyed by short_revision.
|
||||
self._add_short_revision_lookups(lookup)
|
||||
return lookup
|
||||
|
||||
def get_resultset_top_revision_lookup(self, revision_list):
|
||||
"""
|
||||
Create a revision->resultset lookup only for top revisions of the RS
|
||||
|
||||
This lookup does NOT search any revision but the top revision
|
||||
for the resultset. It also does not do a JOIN to the revisions
|
||||
table. So if the resutlset has no revisions mapped to it, that's
|
||||
ok.
|
||||
"""
|
||||
if not revision_list:
|
||||
return {}
|
||||
|
||||
# Build params to search for both long and short revisions.
|
||||
params = self._get_short_and_long_revision_query_params(revision_list)
|
||||
lookup = self.execute(
|
||||
proc='jobs.selects.get_resultset_top_revision_lookup',
|
||||
placeholders=params["placeholders"],
|
||||
replace=params["replacement"],
|
||||
debug_show=self.DEBUG,
|
||||
key_column='long_revision',
|
||||
return_type='dict')
|
||||
|
||||
return lookup
|
||||
|
||||
def get_result_set_list(
|
||||
self, offset_id, limit, full=True, conditions=None):
|
||||
"""
|
||||
Retrieve a list of ``result_sets`` (also known as ``pushes``)
|
||||
If ``full`` is set to ``True`` then return revisions, too.
|
||||
No jobs
|
||||
|
||||
Mainly used by the restful api to list the pushes in the UI
|
||||
"""
|
||||
replace_str, placeholders = self._process_conditions(
|
||||
conditions, self.INDEXED_COLUMNS['result_set']
|
||||
)
|
||||
|
||||
# If a push doesn't have jobs we can just
|
||||
# message the user, it would save us a very expensive join
|
||||
# with the jobs table.
|
||||
|
||||
# Retrieve the filtered/limited list of result sets
|
||||
proc = "jobs.selects.get_result_set_list"
|
||||
result_set_ids = self.execute(
|
||||
proc=proc,
|
||||
replace=[replace_str],
|
||||
placeholders=placeholders,
|
||||
limit=limit,
|
||||
debug_show=self.DEBUG,
|
||||
)
|
||||
|
||||
aggregate_details = self.get_result_set_details(result_set_ids)
|
||||
|
||||
return_list = self._merge_result_set_details(
|
||||
result_set_ids, aggregate_details, full)
|
||||
|
||||
return return_list
|
||||
|
||||
def _merge_result_set_details(self, result_set_ids, aggregate_details, full):
|
||||
|
||||
# Construct the return dataset, include all revisions associated
|
||||
# with each result_set in the revisions attribute
|
||||
return_list = []
|
||||
for result in result_set_ids:
|
||||
|
||||
detail = aggregate_details[result['id']][0]
|
||||
list_item = {
|
||||
"id": result['id'],
|
||||
"revision_hash": result['revision_hash'],
|
||||
"push_timestamp": result['push_timestamp'],
|
||||
"repository_id": detail['repository_id'],
|
||||
"revision": detail['revision'],
|
||||
"author": result['author'] or detail['author'],
|
||||
"revision_count": len(aggregate_details[result['id']])
|
||||
}
|
||||
# we only return the first 20 revisions.
|
||||
if full:
|
||||
list_item.update({
|
||||
"comments": detail['comments'],
|
||||
"revisions": aggregate_details[result['id']][:20]
|
||||
})
|
||||
return_list.append(list_item)
|
||||
|
||||
return return_list
|
||||
|
||||
def get_result_set_details(self, result_set_ids):
|
||||
"""
|
||||
Retrieve all revisions associated with a set of ``result_set``
|
||||
(also known as ``pushes``) ids.
|
||||
|
||||
Mainly used by the restful api to list the pushes and their associated
|
||||
revisions in the UI
|
||||
"""
|
||||
|
||||
if not result_set_ids:
|
||||
# No result sets provided
|
||||
return {}
|
||||
|
||||
# Generate a list of result_set_ids
|
||||
ids = []
|
||||
id_placeholders = []
|
||||
for data in result_set_ids:
|
||||
id_placeholders.append('%s')
|
||||
ids.append(data['id'])
|
||||
|
||||
where_in_clause = ','.join(id_placeholders)
|
||||
|
||||
# Retrieve revision details associated with each result_set_id
|
||||
detail_proc = "jobs.selects.get_result_set_details"
|
||||
result_set_details = self.execute(
|
||||
proc=detail_proc,
|
||||
placeholders=ids,
|
||||
debug_show=self.DEBUG,
|
||||
replace=[where_in_clause],
|
||||
)
|
||||
|
||||
# Aggregate the revisions by result_set_id
|
||||
aggregate_details = {}
|
||||
for detail in result_set_details:
|
||||
|
||||
if detail['result_set_id'] not in aggregate_details:
|
||||
aggregate_details[detail['result_set_id']] = []
|
||||
|
||||
aggregate_details[detail['result_set_id']].append(
|
||||
{
|
||||
'revision': detail['revision'],
|
||||
'author': detail['author'],
|
||||
'repository_id': detail['repository_id'],
|
||||
'comments': detail['comments'],
|
||||
})
|
||||
|
||||
return aggregate_details
|
||||
|
||||
def _get_lower_tier_signatures(self):
|
||||
# get the lower tier data signatures for this project.
|
||||
# if there are none, then just return an empty list
|
||||
|
@ -822,32 +608,25 @@ into chunks of chunk_size size. Returns the number of result sets deleted"""
|
|||
rs_fields = ["revision", "revision_hash"]
|
||||
if not any([x for x in rs_fields if x in datum]):
|
||||
raise ValueError("Job must have either ``revision`` or ``revision_hash``")
|
||||
|
||||
revision = datum.get("revision", None)
|
||||
if not revision:
|
||||
if datum.get('revision'):
|
||||
push_id = Push.objects.values_list('id', flat=True).get(
|
||||
repository__name=self.project,
|
||||
revision__startswith=datum['revision'])
|
||||
else:
|
||||
revision_hash = datum.get('revision_hash')
|
||||
push_id = Push.objects.values_list('id', flat=True).get(
|
||||
repository__name=self.project,
|
||||
revision_hash=revision_hash)
|
||||
newrelic.agent.record_exception(
|
||||
exc=ValueError("job submitted with revision_hash but no revision"),
|
||||
params={
|
||||
"revision_hash": datum["revision_hash"]
|
||||
}
|
||||
)
|
||||
revision = self.get_revision_from_revision_hash(datum["revision_hash"])
|
||||
|
||||
# we assume that there is a result set for this revision by this point
|
||||
result_set_id_list = self.execute(
|
||||
proc='jobs.selects.get_resultset_id_from_revision',
|
||||
debug_show=self.DEBUG,
|
||||
placeholders=[revision])
|
||||
if not result_set_id_list:
|
||||
raise ValueError("Result set not found for revision")
|
||||
result_set_id = result_set_id_list[0]['id']
|
||||
push_id = Push.objects.values_list('id', flat=True).get(
|
||||
repository__name=self.project,
|
||||
revision__startswith=revision)
|
||||
|
||||
# load job
|
||||
(job_guid, reference_data_signature) = self._load_job(
|
||||
job, result_set_id, push_id, lower_tier_signatures)
|
||||
job, push_id, lower_tier_signatures)
|
||||
|
||||
for coalesced_guid in coalesced:
|
||||
coalesced_job_guid_placeholders.append(
|
||||
|
@ -972,7 +751,7 @@ into chunks of chunk_size size. Returns the number of result sets deleted"""
|
|||
revision_hash))
|
||||
return rh[0]["long_revision"]
|
||||
|
||||
def _load_job(self, job_datum, result_set_id, push_id, lower_tier_signatures):
|
||||
def _load_job(self, job_datum, push_id, lower_tier_signatures):
|
||||
"""
|
||||
Load a job into the treeherder database
|
||||
|
||||
|
@ -1122,7 +901,7 @@ into chunks of chunk_size size. Returns the number of result sets deleted"""
|
|||
job_guid,
|
||||
signature_hash,
|
||||
None, # idx:2, job_coalesced_to_guid,
|
||||
result_set_id,
|
||||
None,
|
||||
push_id,
|
||||
build_platform.id,
|
||||
machine_platform.id,
|
||||
|
@ -1164,7 +943,7 @@ into chunks of chunk_size size. Returns the number of result sets deleted"""
|
|||
[
|
||||
job_guid,
|
||||
None,
|
||||
result_set_id,
|
||||
None,
|
||||
push_id,
|
||||
machine.id,
|
||||
option_collection_hash,
|
||||
|
@ -1345,265 +1124,9 @@ into chunks of chunk_size size. Returns the number of result sets deleted"""
|
|||
logger.info("No new resultsets to store")
|
||||
return {}
|
||||
|
||||
#
|
||||
for result_set in result_sets:
|
||||
self._store_push(result_set)
|
||||
|
||||
# revision data structures
|
||||
revision_placeholders = []
|
||||
all_revisions = []
|
||||
rev_where_in_list = []
|
||||
|
||||
# revision_map structures
|
||||
revision_to_rs_revision_lookup = dict()
|
||||
|
||||
unique_rs_revisions = self._get_unique_revisions(result_sets)
|
||||
|
||||
# Retrieve a list of revisions that have already been stored
|
||||
# in the list of unique_revisions. Use it to determine the new
|
||||
# result_sets. Key this off of both long and short revisions since
|
||||
# we can get either
|
||||
resultsets_before = self.get_resultset_top_revision_lookup(
|
||||
unique_rs_revisions)
|
||||
self._add_short_revision_lookups(resultsets_before)
|
||||
resultset_revisions_before = resultsets_before.keys()
|
||||
|
||||
# UPDATE any resultsets that are incomplete
|
||||
#
|
||||
resultset_updates = self._get_resultset_updates(
|
||||
result_sets, resultsets_before)
|
||||
logger.info("Resultsets to update: {}".format(len(resultset_updates)))
|
||||
self._modify_resultsets(resultset_updates,
|
||||
"jobs.updates.update_result_set",
|
||||
revision_placeholders,
|
||||
all_revisions, rev_where_in_list,
|
||||
revision_to_rs_revision_lookup)
|
||||
|
||||
# INSERT any resultsets we don't already have
|
||||
#
|
||||
resultset_inserts = self._get_resultset_inserts(
|
||||
result_sets, resultset_revisions_before, unique_rs_revisions)
|
||||
|
||||
logger.info("Resultsets to insert: {}".format(len(resultset_inserts)))
|
||||
self._modify_resultsets(resultset_inserts,
|
||||
"jobs.inserts.set_result_set",
|
||||
revision_placeholders,
|
||||
all_revisions, rev_where_in_list,
|
||||
revision_to_rs_revision_lookup)
|
||||
|
||||
last_row_id = self.get_dhub().connection['master_host']['cursor'].lastrowid
|
||||
|
||||
# Retrieve new, updated and already existing result sets that
|
||||
# match all the revisions sent in during this request
|
||||
result_set_id_lookup = self.get_resultset_top_revision_lookup(
|
||||
unique_rs_revisions)
|
||||
self._add_short_revision_lookups(result_set_id_lookup)
|
||||
|
||||
# identify the newly inserted result sets
|
||||
result_set_ids_after = set(result_set_id_lookup.keys())
|
||||
inserted_result_sets = result_set_ids_after.difference(
|
||||
resultset_revisions_before
|
||||
)
|
||||
|
||||
inserted_result_set_ids = []
|
||||
|
||||
# If cursor.lastrowid is > 0 rows were inserted on this
|
||||
# cursor. When new rows are inserted, determine the new
|
||||
# result_set ids and submit publish to pulse tasks.
|
||||
if inserted_result_sets and last_row_id > 0:
|
||||
|
||||
for revision in inserted_result_sets:
|
||||
inserted_result_set_ids.append(
|
||||
result_set_id_lookup[revision]['id']
|
||||
)
|
||||
|
||||
# Revisions don't get updated, if we have conflicts here, they
|
||||
# are just skipped. This will insert revisions for both new
|
||||
# resultsets and resultset skeletons that were just updated.
|
||||
# Resultset skeletons don't get revisions till we insert them here.
|
||||
revision_id_lookup = self._insert_revisions(
|
||||
revision_placeholders, all_revisions, rev_where_in_list,
|
||||
revision_to_rs_revision_lookup, result_set_id_lookup)
|
||||
|
||||
return {
|
||||
'result_set_ids': result_set_id_lookup,
|
||||
'revision_ids': revision_id_lookup,
|
||||
'inserted_result_set_ids': inserted_result_set_ids
|
||||
}
|
||||
|
||||
def _get_resultset_updates(self, result_sets,
|
||||
resultsets_before):
|
||||
# find the existing resultsets that meet the requirements of needing
|
||||
# to be updated.
|
||||
rs_need_update = set()
|
||||
for rev, resultset in resultsets_before.items():
|
||||
if resultset["push_timestamp"] == 0 or \
|
||||
len(resultset["long_revision"]) < 40:
|
||||
rs_need_update.add(rev)
|
||||
|
||||
# collect the new values for the resultsets that needed updating
|
||||
# The revision ingested earlier that needs update could be either
|
||||
# 40 or 12 character. And the new one coming in could be either as
|
||||
# well. The rs_need_update will be keyed by both, but we must
|
||||
# check for both 12 and 40.
|
||||
resultset_updates = [x for x in result_sets
|
||||
if x["revision"] in rs_need_update or
|
||||
x["revision"][:12] in rs_need_update]
|
||||
return resultset_updates
|
||||
|
||||
def _get_resultset_inserts(self, result_sets,
|
||||
resultset_revisions_before,
|
||||
unique_rs_revisions):
|
||||
# find the revisions that we don't have resultsets for yet
|
||||
revisions_need_insert = unique_rs_revisions.difference(
|
||||
resultset_revisions_before)
|
||||
|
||||
# collect the new resultset values that need inserting
|
||||
resultset_inserts = [r for r in result_sets
|
||||
if r["revision"] in revisions_need_insert]
|
||||
return resultset_inserts
|
||||
|
||||
def _get_unique_revisions(self, result_sets):
|
||||
unique_rs_revisions = set()
|
||||
for result_set in result_sets:
|
||||
if "revision" in result_set:
|
||||
unique_rs_revisions.add(result_set["revision"])
|
||||
unique_rs_revisions.add(result_set["revision"][:12])
|
||||
else:
|
||||
top_revision = result_set['revisions'][-1]['revision']
|
||||
result_set["revision"] = top_revision
|
||||
unique_rs_revisions.add(top_revision)
|
||||
unique_rs_revisions.add(top_revision[:12])
|
||||
newrelic.agent.record_exception(
|
||||
exc=ValueError(
|
||||
"New resultset submitted without ``revision`` value"),
|
||||
params={"revision": top_revision}
|
||||
)
|
||||
return unique_rs_revisions
|
||||
|
||||
def _modify_resultsets(self, result_sets, procedure, revision_placeholders,
|
||||
all_revisions, rev_where_in_list,
|
||||
revision_to_rs_revision_lookup):
|
||||
"""
|
||||
Either insert or update resultsets, based on the ``procedure``.
|
||||
"""
|
||||
# result_set data structures
|
||||
result_set_placeholders = []
|
||||
unique_rs_revisions = set()
|
||||
where_in_list = []
|
||||
repository_id_lookup = dict()
|
||||
|
||||
for result in result_sets:
|
||||
top_revision = result["revision"]
|
||||
logger.info("Resultset {} with procedure: {}".format(
|
||||
top_revision,
|
||||
procedure))
|
||||
revision_hash = result.get("revision_hash", top_revision)
|
||||
short_top_revision = top_revision[:12]
|
||||
result_set_placeholders.append(
|
||||
[
|
||||
result.get('author', 'unknown@somewhere.com'),
|
||||
revision_hash,
|
||||
top_revision,
|
||||
short_top_revision,
|
||||
result['push_timestamp'],
|
||||
result.get('active_status', 'active'),
|
||||
top_revision,
|
||||
short_top_revision,
|
||||
revision_hash
|
||||
]
|
||||
)
|
||||
where_in_list.append('%s')
|
||||
unique_rs_revisions.add(top_revision)
|
||||
|
||||
for rev_datum in result['revisions']:
|
||||
|
||||
# Retrieve the associated repository id just once
|
||||
# and provide handling for multiple repositories
|
||||
if rev_datum['repository'] not in repository_id_lookup:
|
||||
repository_id = Repository.objects.values_list('id').get(
|
||||
name=rev_datum['repository'])[0]
|
||||
repository_id_lookup[rev_datum['repository']] = repository_id
|
||||
|
||||
# We may not have a comment in the push data
|
||||
comment = rev_datum.get(
|
||||
'comment', None
|
||||
)
|
||||
|
||||
repository_id = repository_id_lookup[rev_datum['repository']]
|
||||
long_revision = rev_datum['revision']
|
||||
short_revision = long_revision[:12]
|
||||
revision_placeholders.append(
|
||||
[long_revision,
|
||||
short_revision,
|
||||
long_revision,
|
||||
rev_datum['author'],
|
||||
comment,
|
||||
repository_id,
|
||||
long_revision,
|
||||
repository_id]
|
||||
)
|
||||
|
||||
all_revisions.append(long_revision)
|
||||
rev_where_in_list.append('%s')
|
||||
revision_to_rs_revision_lookup[long_revision] = top_revision
|
||||
|
||||
self.execute(
|
||||
proc=procedure,
|
||||
placeholders=result_set_placeholders,
|
||||
executemany=True,
|
||||
debug_show=self.DEBUG
|
||||
)
|
||||
|
||||
def _insert_revisions(self, revision_placeholders,
|
||||
all_revisions, rev_where_in_list,
|
||||
revision_to_rs_revision_lookup,
|
||||
result_set_id_lookup):
|
||||
if all_revisions:
|
||||
# Insert new revisions
|
||||
self.execute(
|
||||
proc='jobs.inserts.set_revision',
|
||||
placeholders=revision_placeholders,
|
||||
executemany=True,
|
||||
debug_show=self.DEBUG
|
||||
)
|
||||
|
||||
# Retrieve new revision ids
|
||||
rev_where_in_clause = ','.join(rev_where_in_list)
|
||||
revision_id_lookup = self.execute(
|
||||
proc='jobs.selects.get_revisions',
|
||||
placeholders=all_revisions,
|
||||
replace=[rev_where_in_clause],
|
||||
key_column='long_revision',
|
||||
return_type='dict',
|
||||
debug_show=self.DEBUG
|
||||
)
|
||||
|
||||
# Build placeholders for revision_map
|
||||
revision_map_placeholders = []
|
||||
for revision in revision_id_lookup:
|
||||
|
||||
rs_revision = revision_to_rs_revision_lookup[revision]
|
||||
revision_id = revision_id_lookup[revision]['id']
|
||||
result_set_id = result_set_id_lookup[rs_revision]['id']
|
||||
revision_map_placeholders.append(
|
||||
[revision_id,
|
||||
result_set_id,
|
||||
revision_id,
|
||||
result_set_id]
|
||||
)
|
||||
|
||||
# Insert new revision_map entries
|
||||
self.execute(
|
||||
proc='jobs.inserts.set_revision_map',
|
||||
placeholders=revision_map_placeholders,
|
||||
executemany=True,
|
||||
debug_show=self.DEBUG
|
||||
)
|
||||
else:
|
||||
revision_id_lookup = []
|
||||
return revision_id_lookup
|
||||
|
||||
def _store_push(self, result_set):
|
||||
repository = Repository.objects.get(name=self.project)
|
||||
result_set_revision = result_set.get('revision')
|
||||
|
|
|
@ -124,8 +124,7 @@ class Push(models.Model):
|
|||
repository = models.ForeignKey(Repository)
|
||||
revision_hash = models.CharField(max_length=50, null=True) # legacy
|
||||
# revision can be null if revision_hash defined ^^
|
||||
revision = models.CharField(max_length=40,
|
||||
null=True)
|
||||
revision = models.CharField(max_length=40, null=True)
|
||||
author = models.CharField(max_length=150)
|
||||
time = models.DateTimeField()
|
||||
|
||||
|
|
|
@ -42,57 +42,6 @@
|
|||
OR `job_guid` = ?
|
||||
)",
|
||||
|
||||
"host_type":"master_host"
|
||||
},
|
||||
"set_result_set":{
|
||||
|
||||
"sql":"INSERT INTO `result_set` (`author`,`revision_hash`,`long_revision`,`short_revision`,`push_timestamp`, `active_status`)
|
||||
SELECT ?,?,?,?,?,?
|
||||
FROM DUAL
|
||||
WHERE NOT EXISTS (
|
||||
SELECT `long_revision`, `push_timestamp`
|
||||
FROM `result_set`
|
||||
WHERE `long_revision` = ?
|
||||
OR `short_revision` = ?
|
||||
OR revision_hash = ?
|
||||
)",
|
||||
|
||||
"host_type":"master_host"
|
||||
},
|
||||
"set_revision":{
|
||||
|
||||
"sql":"INSERT INTO `revision` (
|
||||
`long_revision`,
|
||||
`short_revision`,
|
||||
`revision`,
|
||||
`author`,
|
||||
`comments`,
|
||||
`repository_id`
|
||||
)
|
||||
SELECT ?,?,?,?,?,?
|
||||
FROM DUAL
|
||||
WHERE NOT EXISTS (
|
||||
SELECT `revision`
|
||||
FROM `revision`
|
||||
WHERE `long_revision` = ? AND `repository_id` = ?
|
||||
)",
|
||||
|
||||
"host_type":"master_host"
|
||||
},
|
||||
"set_revision_map":{
|
||||
|
||||
"sql":"INSERT INTO `revision_map` (
|
||||
`revision_id`,
|
||||
`result_set_id`
|
||||
)
|
||||
SELECT ?,?
|
||||
FROM DUAL
|
||||
WHERE NOT EXISTS (
|
||||
SELECT `revision_id`, `result_set_id`
|
||||
FROM `revision_map`
|
||||
WHERE `revision_id` = ? AND `result_set_id` = ?
|
||||
)",
|
||||
|
||||
"host_type":"master_host"
|
||||
}
|
||||
},
|
||||
|
@ -162,23 +111,6 @@
|
|||
SET `failure_classification_id` = ?
|
||||
WHERE id = ?",
|
||||
|
||||
"host_type":"master_host"
|
||||
},
|
||||
"update_result_set":{
|
||||
|
||||
"sql":"UPDATE `result_set`
|
||||
SET
|
||||
`author` = ?,
|
||||
`revision_hash` = ?,
|
||||
`long_revision` = ?,
|
||||
`short_revision` = ?,
|
||||
`push_timestamp` = ?,
|
||||
`active_status` = ?
|
||||
WHERE long_revision = ?
|
||||
OR short_revision = ?
|
||||
OR revision_hash = ?
|
||||
",
|
||||
|
||||
"host_type":"master_host"
|
||||
}
|
||||
},
|
||||
|
@ -211,10 +143,6 @@
|
|||
"host_type":"master_host"
|
||||
|
||||
},
|
||||
"get_revision_ids_to_cycle":{
|
||||
"sql":"SELECT revision_id FROM revision_map WHERE result_set_id IN (REP0)",
|
||||
"host_type":"master_host"
|
||||
},
|
||||
|
||||
"get_jobs_to_cycle":{
|
||||
"sql":"SELECT id, job_guid FROM job WHERE submit_timestamp < ?",
|
||||
|
@ -428,90 +356,6 @@
|
|||
"host_type": "read_host"
|
||||
|
||||
},
|
||||
"get_resultset_top_revision_lookup":{
|
||||
"sql":"SELECT `id`, `short_revision`, `long_revision`, `revision_hash`, `push_timestamp`
|
||||
FROM `result_set`
|
||||
WHERE `active_status` in ('active', 'onhold')
|
||||
REP0",
|
||||
"host_type": "master_host"
|
||||
},
|
||||
"get_all_result_set_revisions":{
|
||||
"sql":"SELECT `long_revision`
|
||||
FROM `result_set`
|
||||
WHERE `active_status` = 'active'",
|
||||
"host_type": "read_host"
|
||||
},
|
||||
"get_revisions":{
|
||||
"sql":"SELECT `id`, `revision`, `long_revision` FROM `revision`
|
||||
WHERE `active_status` = 'active' AND `long_revision` IN (REP0)",
|
||||
|
||||
"host_type": "master_host"
|
||||
},
|
||||
"get_resultset_id_from_revision":{
|
||||
"sql":"SELECT `id`
|
||||
FROM `result_set`
|
||||
WHERE INSTR(`long_revision`, ?)",
|
||||
"host_type": "master_host"
|
||||
},
|
||||
"get_result_set_list":{
|
||||
"sql":"SELECT DISTINCT
|
||||
rs.id,
|
||||
rs.author,
|
||||
rs.long_revision,
|
||||
rs.revision_hash,
|
||||
rs.push_timestamp
|
||||
FROM result_set AS rs
|
||||
INNER JOIN revision_map
|
||||
ON rs.id = revision_map.result_set_id
|
||||
INNER JOIN revision
|
||||
ON revision_map.revision_id = revision.id
|
||||
WHERE rs.active_status = 'active'
|
||||
REP0
|
||||
ORDER BY rs.push_timestamp DESC
|
||||
",
|
||||
"host_type": "read_host"
|
||||
},
|
||||
"get_resultset_all_revision_lookup":{
|
||||
"sql":"SELECT
|
||||
rs.id,
|
||||
rs.push_timestamp,
|
||||
rs.active_status,
|
||||
revision.id as revision_id,
|
||||
revision.long_revision,
|
||||
revision.short_revision
|
||||
FROM result_set AS rs
|
||||
INNER JOIN revision_map
|
||||
ON rs.id = revision_map.result_set_id
|
||||
INNER JOIN revision
|
||||
ON revision_map.revision_id = revision.id
|
||||
WHERE 1
|
||||
REP0
|
||||
ORDER BY rs.push_timestamp DESC
|
||||
",
|
||||
"host_type": "read_host"
|
||||
},
|
||||
"get_revision_from_revision_hash":{
|
||||
"sql":"SELECT
|
||||
rs.long_revision
|
||||
FROM result_set AS rs
|
||||
WHERE rs.revision_hash = ?
|
||||
",
|
||||
"host_type": "read_host"
|
||||
},
|
||||
"get_result_set_details":{
|
||||
"sql":"SELECT
|
||||
rm.result_set_id,
|
||||
r.repository_id,
|
||||
r.revision,
|
||||
r.author,
|
||||
r.comments
|
||||
FROM revision_map AS rm
|
||||
LEFT JOIN revision AS r ON rm.revision_id = r.id
|
||||
WHERE r.active_status = 'active' AND rm.result_set_id IN (REP0)
|
||||
ORDER BY r.id DESC",
|
||||
|
||||
"host_type": "read_host"
|
||||
},
|
||||
"get_push_status":{
|
||||
"sql":"SELECT
|
||||
job.id,
|
||||
|
|
|
@ -5,7 +5,6 @@ from collections import namedtuple
|
|||
from django.conf import settings
|
||||
from django.db import transaction
|
||||
|
||||
from treeherder.model.models import Push
|
||||
from treeherder.perf.models import (PerformanceAlert,
|
||||
PerformanceAlertSummary,
|
||||
PerformanceDatum)
|
||||
|
@ -88,29 +87,12 @@ def generate_new_alerts_in_series(signature):
|
|||
# threshold
|
||||
continue
|
||||
|
||||
# temporary: look up result set information to go along
|
||||
# with push stuff
|
||||
from treeherder.model.derived import JobsModel
|
||||
with JobsModel(signature.repository.name) as jm:
|
||||
push = Push.objects.get(id=cur.testrun_id)
|
||||
result_set_id = jm.execute(
|
||||
proc='jobs.selects.get_resultset_id_from_revision',
|
||||
placeholders=[push.revision])[0]['id']
|
||||
if prev_testrun_id:
|
||||
prev_push = Push.objects.get(id=prev_testrun_id)
|
||||
prev_result_set_id = jm.execute(
|
||||
proc='jobs.selects.get_resultset_id_from_revision',
|
||||
placeholders=[prev_push.revision])[0]['id']
|
||||
else:
|
||||
prev_result_set_id = None
|
||||
summary, _ = PerformanceAlertSummary.objects.get_or_create(
|
||||
repository=signature.repository,
|
||||
framework=signature.framework,
|
||||
push_id=cur.testrun_id,
|
||||
prev_push_id=prev_testrun_id,
|
||||
defaults={
|
||||
'result_set_id': result_set_id,
|
||||
'prev_result_set_id': prev_result_set_id,
|
||||
'manually_created': False,
|
||||
'last_updated': datetime.datetime.utcfromtimestamp(
|
||||
cur.push_timestamp)
|
||||
|
|
|
@ -12,7 +12,6 @@ from rest_framework.response import Response
|
|||
from rest_framework.status import HTTP_400_BAD_REQUEST
|
||||
|
||||
from treeherder.model import models
|
||||
from treeherder.model.derived import JobsModel
|
||||
from treeherder.perf.alerts import get_alert_properties
|
||||
from treeherder.perf.models import (PerformanceAlert,
|
||||
PerformanceAlertSummary,
|
||||
|
@ -246,25 +245,12 @@ class PerformanceAlertSummaryViewSet(viewsets.ModelViewSet):
|
|||
ordering = ('-last_updated', '-id')
|
||||
pagination_class = AlertSummaryPagination
|
||||
|
||||
@staticmethod
|
||||
def _get_result_set_id_for_push(push_id):
|
||||
# horrible hack to get result set id information so we can roll
|
||||
# this back if necessary, remove later
|
||||
push = models.Push.objects.get(id=push_id)
|
||||
with JobsModel(push.repository.name) as jm:
|
||||
result_set_id_list = jm.execute(
|
||||
proc='jobs.selects.get_resultset_id_from_revision',
|
||||
placeholders=[push.revision])
|
||||
return result_set_id_list[0]['id']
|
||||
|
||||
def create(self, request, *args, **kwargs):
|
||||
data = request.data
|
||||
|
||||
alert_summary, _ = PerformanceAlertSummary.objects.get_or_create(
|
||||
repository_id=data['repository_id'],
|
||||
framework=PerformanceFramework.objects.get(id=data['framework_id']),
|
||||
result_set_id=self._get_result_set_id_for_push(data['push_id']),
|
||||
prev_result_set_id=self._get_result_set_id_for_push(data['prev_push_id']),
|
||||
push_id=data['push_id'],
|
||||
prev_push_id=data['prev_push_id'],
|
||||
defaults={
|
||||
|
|
Загрузка…
Ссылка в новой задаче