Merge pull request #229 from mozilla/bug-1072291-fix-pushlog-ingestion

(bug 1072291) revert pushlog caching strategy
This commit is contained in:
jeads 2014-09-26 09:38:02 -07:00
Родитель a2c561eb61 cb3d46df36
Коммит 31bfc111f6
10 изменённых файлов: 75 добавлений и 63 удалений

Просмотреть файл

@ -13,6 +13,8 @@ WebTest==1.3.4
WebOb==1.2
mock==1.0b1
responses==0.2.2
django-extensions==1.3.3
# in order to be able to run bin/generate-vendor-lib.py

Просмотреть файл

@ -6,6 +6,7 @@ import json
import pytest
from django.core.management import call_command
from webtest.app import TestApp
import responses
from thclient.client import TreeherderRequest
@ -367,3 +368,14 @@ def mock_get_remote_content(monkeypatch):
import treeherder.etl.common
monkeypatch.setattr(treeherder.etl.common, 'get_remote_content', _get_remote_content)
@pytest.fixture
def activate_responses(request):
responses.start()
def fin():
responses.stop()
request.addfinalizer(fin)

Просмотреть файл

@ -1,15 +1,22 @@
import os
import responses
from treeherder.etl.pushlog import HgPushlogProcess
def test_ingest_hg_pushlog(jm, initial_data, test_base_dir,
test_repository, mock_post_json_data):
test_repository, mock_post_json_data, activate_responses):
"""ingesting a number of pushes should populate result set and revisions"""
pushlog = os.path.join(test_base_dir, 'sample_data', 'hg_pushlog.json')
pushlog_path = os.path.join(test_base_dir, 'sample_data', 'hg_pushlog.json')
pushlog_content = open(pushlog_path).read()
pushlog_fake_url = "http://www.thisismypushlog.com"
responses.add(responses.GET, pushlog_fake_url,
body=pushlog_content, status=200,
content_type='application/json')
process = HgPushlogProcess()
process.run("file://{0}".format(pushlog), jm.project)
process.run(pushlog_fake_url, jm.project)
pushes_stored = jm.get_jobs_dhub().execute(
proc="jobs_test.selects.result_set_ids",

Просмотреть файл

@ -5,9 +5,6 @@ import itertools
import pprint
import copy
from django.core.cache import cache
from treeherder.etl.common import generate_result_set_cache_key
from treeherder.model.derived.base import DatasetNotFoundError
from tests.sample_data_generator import job_data, result_set
from tests.sampledata import SampleData
@ -359,12 +356,6 @@ def test_store_result_set_data(jm, initial_data, sample_resultset):
assert set(data['result_set_ids'].keys()) == revision_hashes
assert set(data['revision_ids'].keys()) == revisions
# confirm all revision_hashes were stored in the cache
for target_revision_hash in data['result_set_ids'].keys():
key = generate_result_set_cache_key(jm.project, target_revision_hash)
revision_hash = cache.get(key)
assert revision_hash == target_revision_hash
# Confirm the data structures returned match what's stored in
# the database
print '<><>EXP'

Просмотреть файл

@ -116,8 +116,6 @@ def generate_revision_hash(revisions):
return sh.hexdigest()
def generate_result_set_cache_key(project, revision_hash):
return "{0}-{1}".format(project, revision_hash)
def generate_job_guid(request_id, request_time, endtime=None):
"""Converts a request_id and request_time into a guid"""

Просмотреть файл

@ -133,6 +133,6 @@ class OAuthLoaderMixin(object):
if not response or response.status != 200:
message = response.read()
logger.error("collection loading failed: {0}".format(message))
raise Exception('collection loading failed: {0}'.format(message))

Просмотреть файл

@ -1,9 +1,11 @@
from operator import itemgetter
from django.core.cache import cache
from django.conf import settings
import requests
from thclient import TreeherderRequest, TreeherderResultSetCollection
from .mixins import JsonExtractorMixin, OAuthLoaderMixin
from treeherder.etl.common import generate_revision_hash, generate_result_set_cache_key
from treeherder.etl.common import generate_revision_hash
class HgPushlogTransformerMixin(object):
@ -11,7 +13,6 @@ class HgPushlogTransformerMixin(object):
def transform(self, pushlog, repository):
# this contain the whole list of transformed pushes
result_sets = []
th_collections = {}
@ -47,15 +48,6 @@ class HgPushlogTransformerMixin(object):
result_set['revision_hash'] = generate_revision_hash(rev_hash_components)
cached_revision_hash = cache.get(
generate_result_set_cache_key(
repository, result_set['revision_hash']
) )
if cached_revision_hash == result_set['revision_hash']:
# Result set is already loaded
continue
if repository not in th_collections:
th_collections[ repository ] = TreeherderResultSetCollection()
@ -65,21 +57,49 @@ class HgPushlogTransformerMixin(object):
return th_collections
class HgPushlogProcess(JsonExtractorMixin,
HgPushlogTransformerMixin,
class HgPushlogProcess(HgPushlogTransformerMixin,
OAuthLoaderMixin):
def extract(self, url):
response = requests.get(url, timeout=settings.TREEHERDER_REQUESTS_TIMEOUT)
response.raise_for_status()
return response.json()
def run(self, source_url, repository):
extracted_content = self.extract(source_url)
if extracted_content:
self.load(
self.transform(
extracted_content,
repository
# get the last object seen from cache. this will
# reduce the number of pushes processed every time
last_push = cache.get("{0}:last_push".format(repository))
if last_push:
try:
# make an attempt to use the last revision cached
extracted_content = self.extract(
source_url+"&fromchange="+last_push
)
except requests.exceptions.HTTPError, e:
# in case of a 404 error, delete the cache key
# and try it without any parameter
if e.response.status_code == 404:
cache.delete("{0}:last_push".format(repository))
extracted_content = self.extract(source_url)
else:
raise e
else:
extracted_content = self.extract(source_url)
if extracted_content:
sorted_pushlog = sorted(extracted_content.values(),
key=itemgetter('date'), reverse=True)
last_push = sorted_pushlog[0]
top_revision = last_push["changesets"][0]["node"]
transformed = self.transform(
extracted_content,
repository
)
self.load(transformed)
cache.set("{0}:last_push".format(repository), top_revision)
class GitPushlogTransformerMixin(object):

Просмотреть файл

@ -16,7 +16,7 @@ from .tbpl import OrangeFactorBugRequest, TbplBugRequest, BugzillaBugRequest
from .pushlog import HgPushlogProcess
@task(name='fetch-buildapi-pending', time_limit=60)
@task(name='fetch-buildapi-pending', time_limit=3*60)
def fetch_buildapi_pending():
"""
Fetches the buildapi pending jobs api and load them to
@ -25,7 +25,7 @@ def fetch_buildapi_pending():
PendingJobsProcess().run()
@task(name='fetch-buildapi-running', time_limit=60)
@task(name='fetch-buildapi-running', time_limit=3*60)
def fetch_buildapi_running():
"""
Fetches the buildapi running jobs api and load them to
@ -34,7 +34,7 @@ def fetch_buildapi_running():
RunningJobsProcess().run()
@task(name='fetch-buildapi-build4h', time_limit=120)
@task(name='fetch-buildapi-build4h', time_limit=3*60)
def fetch_buildapi_build4h():
"""
Fetches the buildapi running jobs api and load them to
@ -59,16 +59,16 @@ def fetch_push_logs():
rdm.disconnect()
@task(name='fetch-hg-push-logs', time_limit=60)
@task(name='fetch-hg-push-logs', time_limit=3*60)
def fetch_hg_push_log(repo_name, repo_url):
"""
Run a HgPushlog etl process
"""
process = HgPushlogProcess()
process.run(repo_url+'/json-pushes/?full=1&maxhours=24', repo_name)
process.run(repo_url+'/json-pushes/?full=1', repo_name)
@task(name='fetch-bugs', time_limit=60*5)
@task(name='fetch-bugs', time_limit=10*60)
def fetch_bugs():
"""
Run a BzApiBug process
@ -86,7 +86,7 @@ def run_builds4h_analyzer():
process.run()
@task(name="submit-star-comment", max_retries=3, time_limit=30)
@task(name="submit-star-comment", max_retries=10, time_limit=30)
def submit_star_comment(project, job_id, bug_id, submit_timestamp, who):
"""
Send a post request to tbpl's starcomment.php containing a bug association.
@ -103,7 +103,7 @@ def submit_star_comment(project, job_id, bug_id, submit_timestamp, who):
raise
@task(name="submit-build-star", max_retries=3, time_limit=30)
@task(name="submit-build-star", max_retries=10, time_limit=30)
def submit_build_star(project, job_id, who, bug_id=None, classification_id=None, note=None):
"""
Send a post request to tbpl's submitBuildStar.php to mirror sheriff's activity
@ -120,7 +120,7 @@ def submit_build_star(project, job_id, who, bug_id=None, classification_id=None,
raise
@task(name="submit-bug-comment", max_retries=3, time_limit=30)
@task(name="submit-bug-comment", max_retries=10, time_limit=30)
def submit_bug_comment(project, job_id, bug_id):
"""
Send a post request to tbpl's submitBugzillaComment.php

Просмотреть файл

@ -26,7 +26,6 @@ from .base import TreeherderModelBase, ObjectNotFoundException
from datasource.DataHub import DataHub
from treeherder.etl.perf_data_adapters import TalosDataAdapter
from treeherder.etl.common import generate_result_set_cache_key
logger = logging.getLogger(__name__)
@ -61,9 +60,6 @@ class JobsModel(TreeherderModelBase):
INCOMPLETE_STATES = ["running", "pending"]
STATES = INCOMPLETE_STATES + ["completed", "coalesced"]
# Set timeout (seconds) to 48 hours
RESULT_SET_CACHE_TIMEOUT = (60 * 60) * 48
# indexes of specific items in the ``job_placeholder`` objects
JOB_PH_JOB_GUID = 0
JOB_PH_COALESCED_TO_GUID = 2
@ -2589,18 +2585,11 @@ class JobsModel(TreeherderModelBase):
# result_set ids and submit publish to pulse tasks.
if inserted_result_sets and lastrowid > 0:
cache_data = {}
for revision_hash in inserted_result_sets:
inserted_result_set_ids.append(
result_set_id_lookup[revision_hash]['id']
)
key = generate_result_set_cache_key(
self.project, revision_hash
)
cache_data[key] = revision_hash
# Insert new revisions
dhub.execute(
proc='jobs.inserts.set_revision',
@ -2609,11 +2598,6 @@ class JobsModel(TreeherderModelBase):
debug_show=self.DEBUG
)
cache.set_many(
cache_data,
self.RESULT_SET_CACHE_TIMEOUT
)
# Retrieve new revision ids
rev_where_in_clause = ','.join(rev_where_in_list)
select_proc = 'get_revision_ids'

Просмотреть файл

@ -1,7 +1,5 @@
from django.contrib import admin
from treeherder.model.models import *
from django.core.cache import cache
from django_browserid.admin import site as browserid_admin