remove db access from the parse_log task

This commit is contained in:
mdoglio 2014-03-10 19:34:48 +01:00
Родитель f2a11cfcb8
Коммит 9df5e5b108
6 изменённых файлов: 132 добавлений и 48 удалений

Просмотреть файл

@ -0,0 +1,33 @@
import pytest
from webtest.app import TestApp
from thclient import TreeherderRequest
from treeherder.etl.oauth_utils import OAuthCredentials
from treeherder.webapp.wsgi import application
from tests.sampledata import SampleData
@pytest.fixture
def mock_send_request(monkeypatch, jm):
def _send(th_request, th_collection):
OAuthCredentials.set_credentials(SampleData.get_credentials())
credentials = OAuthCredentials.get_credentials(jm.project)
th_request.oauth_key = credentials['consumer_key']
th_request.oauth_secret = credentials['consumer_secret']
signed_uri = th_request.get_signed_uri(
th_collection.to_json(), th_request.get_uri(th_collection)
)
response = TestApp(application).post_json(
str(signed_uri), params=th_collection.get_collection_data()
)
response.getcode = lambda: response.status_int
return response
monkeypatch.setattr(TreeherderRequest, 'send', _send)

Просмотреть файл

@ -23,7 +23,7 @@ def jobs_with_local_log(jm, initial_data):
return [job]
def test_parse_log(jm, initial_data, jobs_with_local_log, sample_resultset):
def test_parse_log(jm, initial_data, jobs_with_local_log, sample_resultset, mock_send_request):
"""
check that at least 2 job_artifacts get inserted when running
a parse_log task

Просмотреть файл

@ -8,28 +8,27 @@ http://docs.celeryproject.org/en/latest/userguide/canvas.html#guide-canvas
"""
import simplejson as json
import re
import urllib
from celery import task
from django.conf import settings
from django.core.urlresolvers import reverse
from treeherder.model.derived import JobsModel, RefDataManager
from thclient import TreeherderArtifactCollection, TreeherderRequest
from treeherder.log_parser.artifactbuildercollection import ArtifactBuilderCollection
from treeherder.events.publisher import JobFailurePublisher, JobStatusPublisher
from treeherder.etl.common import get_remote_content
import urllib
from treeherder.etl.oauth_utils import OAuthCredentials
@task(name='parse-log')
def parse_log(project, job_id, result_set_id, check_errors=False):
def parse_log(project, log_url, job_guid, resultset, check_errors=False):
"""
Call ArtifactBuilderCollection on the given job.
"""
pattern_obj = re.compile('\d+:\d+:\d+\s+')
jm = JobsModel(project=project)
rdm = RefDataManager()
open_bugs_cache = {}
closed_bugs_cache = {}
@ -39,26 +38,22 @@ def parse_log(project, job_id, result_set_id, check_errors=False):
try:
# return the resultset with the job id to identify if the UI wants
# to fetch the whole thing.
resultset = jm.get_result_set_by_id(result_set_id=result_set_id)[0]
del(resultset["active_status"])
del(resultset["revision_hash"])
log_references = jm.get_log_references(job_id)
artifact_uri = reverse("bugscache-list")
bugscache_uri = reverse("bugscache-list")
# we may have many log references per job
for log in log_references:
credentials = OAuthCredentials.get_credentials(project)
if log_url:
# parse a log given its url
artifact_bc = ArtifactBuilderCollection(
log['url'],
log_url,
check_errors=check_errors,
)
artifact_bc.parse()
artifact_list = []
for name, artifact in artifact_bc.artifacts.items():
artifact_list.append((job_id, name, 'json', json.dumps(artifact)))
artifact_list.append((job_guid, name, 'json', json.dumps(artifact)))
if check_errors:
# I'll try to begin with a full_text search on the entire row
@ -80,7 +75,10 @@ def parse_log(project, job_id, result_set_id, check_errors=False):
})
open_bugs_cache[clean_line] = get_remote_content(
"{0}?{1}".format(artifact_uri, query_params)
"{0}{1}?{2}".format(
settings.API_HOSTNAME,
bugscache_uri,
query_params)
)
if clean_line not in closed_bugs_cache:
@ -89,23 +87,41 @@ def parse_log(project, job_id, result_set_id, check_errors=False):
"status": 'closed'
})
closed_bugs_cache[clean_line] = get_remote_content(
"{0}?{1}".format(artifact_uri, query_params)
"{0}?{1}".format(
settings.API_HOSTNAME,
bugscache_uri,
query_params)
)
open_bugs_suggestions[ err['line'] ] = open_bugs_cache[clean_line]
closed_bugs_suggestions[ err['line'] ] = closed_bugs_cache[clean_line]
artifact_list.append((job_id, 'Open bugs', 'json', json.dumps(open_bugs_suggestions)))
artifact_list.append((job_id, 'Closed bugs', 'json', json.dumps(closed_bugs_suggestions)))
artifact_list.append((job_guid, 'Open bugs', 'json', json.dumps(open_bugs_suggestions)))
artifact_list.append((job_guid, 'Closed bugs', 'json', json.dumps(closed_bugs_suggestions)))
# store the artifacts generated
jm.store_job_artifact(artifact_list)
status_publisher.publish(job_id, resultset, project, 'processed')
tac = TreeherderArtifactCollection()
for artifact in artifact_list:
ta = tac.get_artifact({
"job_guid": artifact[0],
"name": artifact[1],
"type": artifact[2],
"blob": artifact[3]
})
tac.add(ta)
req = TreeherderRequest(
protocol=settings.TREEHERDER_REQUEST_PROTOCOL,
host=settings.TREEHERDER_REQUEST_HOST,
project=project,
oauth_key=credentials.get('consumer_key', None),
oauth_secret=credentials.get('consumer_secret', None),
)
req.send(tac)
status_publisher.publish(job_guid, resultset, project, 'processed')
if check_errors:
failure_publisher.publish(job_id, project)
failure_publisher.publish(job_guid, project)
finally:
rdm.disconnect()
jm.disconnect()
status_publisher.disconnect()
failure_publisher.disconnect()

Просмотреть файл

@ -546,15 +546,25 @@ class JobsModel(TreeherderModelBase):
return data
def get_result_set_by_id(self, result_set_id):
"""Get a single result_set by ``id``."""
proc = "jobs.selects.get_result_set_by_id"
def get_push_timestamp_lookup(self, result_set_ids):
"""Get the push timestamp for a list of result_set."""
# Generate a list of result_set_ids
id_placeholders = []
repl = []
for data in result_set_ids:
id_placeholders.append('%s')
repl.append(','.join(id_placeholders))
proc = "jobs.selects.get_result_set_push_timestamp"
data = self.get_jobs_dhub().execute(
proc=proc,
placeholders=[result_set_id],
placeholders=result_set_ids,
debug_show=self.DEBUG,
replace=repl,
return_type="dict",
key_column="id"
)
return data
##################
@ -946,12 +956,7 @@ class JobsModel(TreeherderModelBase):
url = log.get('url', 'unknown')
url = url[0:255]
log_placeholders.append(
[
job_guid,
name,
url
] )
log_placeholders.append([job_guid, name, url])
artifact = job.get('artifact', {})
if artifact:
@ -1057,9 +1062,12 @@ class JobsModel(TreeherderModelBase):
placeholders=job_placeholders,
executemany=True )
job_guid_where_in_clause = ",".join(job_guid_where_in_list)
return self.get_job_ids_by_guid(job_guid_list)
def get_job_ids_by_guid(self, job_guid_list):
job_guid_where_in_clause = ",".join(["%s"] * len(job_guid_list))
# Retrieve new job ids
job_id_lookup = self.get_jobs_dhub().execute(
proc='jobs.selects.get_job_ids_by_guids',
debug_show=self.DEBUG,
@ -1070,6 +1078,7 @@ class JobsModel(TreeherderModelBase):
return job_id_lookup
def _load_log_urls(self, log_placeholders, job_id_lookup,
job_results):
@ -1078,18 +1087,22 @@ class JobsModel(TreeherderModelBase):
tasks = []
result_sets = []
if log_placeholders:
for index, log_ref in enumerate(log_placeholders):
job_guid = log_placeholders[index][0]
job_guid = log_ref[0]
job_id = job_id_lookup[job_guid]['id']
result = job_results[job_guid]
result_set_id = job_id_lookup[job_guid]['result_set_id']
result_sets.append(result_set_id)
# Replace job_guid with id
log_placeholders[index][0] = job_id
task = dict()
task['id'] = job_id
task['job_guid'] = job_guid
task['log_url'] = log_ref[2]
task['result_set_id'] = result_set_id
if result != 'success':
task['check_errors'] = True
@ -1099,6 +1112,9 @@ class JobsModel(TreeherderModelBase):
task['routing_key'] = 'parse_log.success'
tasks.append(task)
# a dict of result_set_id => push_timestamp
push_timestamp_lookup = self.get_push_timestamp_lookup(result_sets)
# Store the log references
self.get_jobs_dhub().execute(
proc='jobs.inserts.set_job_log_url',
@ -1107,9 +1123,16 @@ class JobsModel(TreeherderModelBase):
executemany=True)
for task in tasks:
parse_log.apply_async(args=[self.project, task['id'], task['result_set_id']],
kwargs={'check_errors': task['check_errors']},
routing_key=task['routing_key'])
parse_log.apply_async(
args=[
self.project,
task['log_url'],
task['job_guid'],
push_timestamp_lookup[task['result_set_id']]
],
kwargs={'check_errors': task['check_errors']},
routing_key=task['routing_key']
)
def store_job_artifact(self, artifact_placeholders):

Просмотреть файл

@ -426,10 +426,10 @@
"host": "read_host"
},
"get_result_set_by_id":{
"sql":"SELECT *
"get_result_set_push_timestamp":{
"sql":"SELECT id, push_timestamp
FROM result_set
WHERE id = ?",
WHERE id IN (REP0)",
"host": "read_host"
},
"get_result_set_job_list":{

Просмотреть файл

@ -220,7 +220,20 @@ class ArtifactViewSet(viewsets.ViewSet):
@with_jobs
@oauth_required
def create(self, request, project, jm):
jm.store_job_artifact(request.DATA)
artifact_data = []
job_guids = [x['job_guid'] for x in request.DATA]
job_id_lookup = jm.get_job_ids_by_guid(job_guids)
for datum in request.DATA:
artifact_data.append((
job_id_lookup[datum['job_guid']]['id'],
datum['name'],
datum['type'],
datum['blob']
))
jm.store_job_artifact(artifact_data)
return Response({'message': 'Artifacts stored successfully'})
@ -650,7 +663,6 @@ class BugJobMapViewSet(viewsets.ViewSet):
return Response(objs)
#####################
# Refdata ViewSets
#####################