Bug 1080219 - Refactor job artifact ingestion and fix performance data ingestion

This commit is contained in:
Jonathan Eads 2014-10-10 18:09:59 -07:00
Родитель c370f477e2
Коммит 40ff9e74b0
5 изменённых файлов: 134 добавлений и 55 удалений

Просмотреть файл

@ -15,7 +15,8 @@ class PerformanceDataAdapter(object):
"""
performance_types = set([
'performance'
'performance',
'talos_data'
])
def __init__(self):
@ -136,21 +137,23 @@ class TalosDataAdapter(PerformanceDataAdapter):
def adapt_and_load(self, reference_data, job_data, datum):
datum['blob'] = json.loads(datum['blob'])
# Get just the talos datazilla structure for treeherder
target_datum = json.loads(datum['blob'])
talos_datum = target_datum['talos_data'][0]
validate(datum['blob'], self.datazilla_schema)
validate(talos_datum, self.datazilla_schema)
_job_guid = datum["job_guid"]
_name = datum["name"]
_type = "performance"
_suite = datum["blob"]["testrun"]["suite"]
_suite = talos_datum["testrun"]["suite"]
# data for performance series
job_id = job_data[_job_guid]['id']
result_set_id = job_data[_job_guid]['result_set_id']
push_timestamp = job_data[_job_guid]['push_timestamp']
for _test in datum["blob"]["results"].keys():
for _test in talos_datum["results"].keys():
signature_properties = {}
@ -160,13 +163,15 @@ class TalosDataAdapter(PerformanceDataAdapter):
'test':_test
})
signature_prop_values = signature_properties.keys()
signature_prop_values.extend(signature_properties.values())
series_signature = self.get_series_signature(
signature_properties.values()
)
signature_prop_values)
series_data = self.calculate_series_data(
job_id, result_set_id, push_timestamp,
datum["blob"]["results"][_test]
talos_datum["results"][_test]
)
obj = {
@ -179,17 +184,17 @@ class TalosDataAdapter(PerformanceDataAdapter):
"performance_series": series_data,
"testsuite": _suite,
"test": _test,
"replicates": datum["blob"]["results"][_test],
"replicates": talos_datum["results"][_test],
"metadata":{}
}
}
options = datum["blob"]["testrun"].get(
options = talos_datum["testrun"].get(
"options", {})
if options:
obj['blob']['metadata']['options'] = options
test_aux = datum["blob"].get(
test_aux = talos_datum.get(
"test_aux", {})
if test_aux:
obj['blob']['metadata']['auxiliary_data'] = test_aux
@ -224,7 +229,7 @@ class TalosDataAdapter(PerformanceDataAdapter):
sha = sha1()
sha.update(''.join(map(lambda x: str(x), signature_values)))
sha.update(''.join(map(lambda x: str(x), sorted(signature_values))))
signature = sha.hexdigest()

Просмотреть файл

@ -33,7 +33,8 @@ class ArtifactBuilderBase(object):
"""Parse a single line of the log."""
# truncate the line to the max line-length
line = line[:self.MAX_LINE_LENGTH]
if "TALOSDATA" not in line:
line = line[:self.MAX_LINE_LENGTH]
for parser in self.parsers:
if not parser.complete:

Просмотреть файл

@ -364,7 +364,6 @@ class ErrorParser(ParserBase):
RE_TALOSDATA = re.compile('.*?TALOSDATA: (\[.*\])$')
class TalosParser(ParserBase):
"""a sub-parser to find TALOSDATA"""
@ -375,7 +374,7 @@ class TalosParser(ParserBase):
"""check each line for TALOSDATA"""
match = RE_TALOSDATA.match(line)
if "TALOSDATA: " in line and match:
if "TALOSDATA" in line and match:
try:
self.artifact = json.loads(match.group(1))
except ValueError:

Просмотреть файл

@ -25,7 +25,8 @@ from .base import TreeherderModelBase, ObjectNotFoundException
from datasource.DataHub import DataHub
from treeherder.etl.perf_data_adapters import TalosDataAdapter
from treeherder.etl.perf_data_adapters import (PerformanceDataAdapter,
TalosDataAdapter)
logger = logging.getLogger(__name__)
@ -1425,7 +1426,7 @@ class JobsModel(TreeherderModelBase):
self._load_log_urls(log_placeholders, job_id_lookup,
job_results)
self._load_job_artifacts(artifact_placeholders, job_id_lookup)
self.load_job_artifacts(artifact_placeholders, job_id_lookup)
# If there is already a job_id stored with pending/running status
# we need to update the information for the complete job
@ -2215,19 +2216,118 @@ class JobsModel(TreeherderModelBase):
debug_show=self.DEBUG,
placeholders=[lock_string])
def _load_job_artifacts(self, artifact_placeholders, job_id_lookup):
def load_job_artifacts(self, artifact_data, job_id_lookup):
"""
Store a list of job artifacts substituting job_guid with job_id
"""
# Replace job_guid with id in artifact placeholders
for index, artifact in enumerate(artifact_placeholders):
job_id = job_id_lookup[
artifact_placeholders[index][0]]['id']
artifact_placeholders[index][0] = job_id
artifact_placeholders[index][4] = job_id
Determine what type of artifacts are contained in artifact_data and
store a list of job artifacts substituting job_guid with job_id. All
of the datums in artifact_data need to be of one of the three
different tasty "flavors" described below.
if artifact_placeholders:
self.store_job_artifact(artifact_placeholders)
artifact_placeholders:
Comes in through the web service as the "artifacts" property
in a job in a job collection
(https://github.com/mozilla/treeherder-client#job-collection)
A list of lists
[
[job_guid, name, artifact_type, blob, job_guid, name]
]
job_artifact_collection:
performance_artifact:
"""
artifact_placeholders_list = []
job_artifact_list = []
performance_artifact_list = []
performance_artifact_job_id_list = []
for index, artifact in enumerate(artifact_data):
artifact_placeholders = False
job_artifact_collection = False
performance_artifact_collection = False
# Determine what type of artifact we have received
if artifact:
if type(artifact) is list:
artifact_placeholders = True
else:
artifact_name = artifact['name']
if artifact_name in PerformanceDataAdapter.performance_types:
performance_artifact_collection = True
else:
job_artifact_collection = True
# Call the correct adapter for the data type
if artifact_placeholders:
self._adapt_job_artifact_placeholders(
index, artifact_placeholders_list, job_id_lookup)
if job_artifact_collection:
self._adapt_job_artifact_collection(
artifact, job_artifact_list, job_id_lookup)
if performance_artifact_collection:
self._adapt_performance_artifact_collection(
artifact, performance_artifact_list,
performance_artifact_job_id_list, job_id_lookup)
# Store the various artifact types if we collected them
if artifact_placeholders_list:
self.store_job_artifact(artifact_placeholders_list)
if job_artifact_list:
self.store_job_artifact(job_artifact_list)
if performance_artifact_list and performance_artifact_job_id_list:
self.store_performance_artifact(
performance_artifact_job_id_list, performance_artifact_list)
def _adapt_job_artifact_placeholders(
self, index, artifact_data, job_id_lookup):
job_guid = artifact[0]
job_id = job_id_lookup.get(job_guid, {}).get('id', None)
if job_id:
# Replace job_guid with id in artifact data
artifact_data[index][0] = job_id
artifact_data[index][4] = job_id
def _adapt_job_artifact_collection(
self, artifact, artifact_data, job_id_lookup):
job_id = job_id_lookup.get(
artifact['job_guid'], {}
).get('id', None)
if job_id:
artifact_data.append((
job_id,
artifact['name'],
artifact['type'],
artifact['blob'],
job_id,
artifact['name'],
))
def _adapt_performance_artifact_collection(
self, artifact, artifact_data, job_id_list, job_id_lookup):
job_id = job_id_lookup.get(
artifact['job_guid'], {}
).get('id', None)
if job_id:
job_id_list.append(job_id)
artifact_data.append(artifact)
def _get_last_insert_id(self, contenttype="jobs"):
"""Return last-inserted ID."""

Просмотреть файл

@ -38,36 +38,10 @@ class ArtifactViewSet(viewsets.ViewSet):
@with_jobs
@oauth_required
def create(self, request, project, jm):
artifact_data = []
performance_artifact_data = []
job_id_list = []
job_guids = [x['job_guid'] for x in request.DATA]
job_id_lookup = jm.get_job_ids_by_guid(job_guids)
for datum in request.DATA:
job_id = job_id_lookup.get( datum['job_guid'], {}).get('id', None)
if job_id:
if datum['type'] in PerformanceDataAdapter.performance_types:
job_id_list.append(job_id)
performance_artifact_data.append(datum)
else:
artifact_data.append((
job_id,
datum['name'],
datum['type'],
datum['blob'],
job_id,
datum['name'],
))
if artifact_data:
jm.store_job_artifact(artifact_data)
if job_id_list and performance_artifact_data:
jm.store_performance_artifact(
job_id_list, performance_artifact_data)
jm.load_job_artifacts(request.DATA, job_id_lookup)
return Response({'message': 'Artifacts stored successfully'})