зеркало из https://github.com/mozilla/treeherder.git
Bug 1191934 - Remove the now-redundant fetch-missing-pushlogs task
The task was a workaround for us missing pushes, however the root causes of these have since been fixed.
This commit is contained in:
Родитель
01bb61a91b
Коммит
92793e569e
2
Procfile
2
Procfile
|
@ -1,6 +1,6 @@
|
|||
web: newrelic-admin run-program gunicorn treeherder.config.wsgi:application --log-file - --timeout 29 --max-requests 2000
|
||||
worker_beat: newrelic-admin run-program celery -A treeherder beat
|
||||
worker_pushlog: newrelic-admin run-program celery -A treeherder worker -Q pushlog,fetch_missing_push_logs --maxtasksperchild=500 --concurrency=5
|
||||
worker_pushlog: newrelic-admin run-program celery -A treeherder worker -Q pushlog --maxtasksperchild=500 --concurrency=5
|
||||
worker_buildapi_pending: newrelic-admin run-program celery -A treeherder worker -Q buildapi_pending --maxtasksperchild=20 --concurrency=5
|
||||
worker_buildapi_running: newrelic-admin run-program celery -A treeherder worker -Q buildapi_running --maxtasksperchild=20 --concurrency=5
|
||||
worker_buildapi_4hr: newrelic-admin run-program celery -A treeherder worker -Q buildapi_4hr --maxtasksperchild=20 --concurrency=1
|
||||
|
|
|
@ -15,7 +15,7 @@ if [ ! -f $LOGFILE ]; then
|
|||
fi
|
||||
|
||||
exec newrelic-admin run-program celery -A treeherder worker \
|
||||
-Q pushlog,fetch_missing_push_logs \
|
||||
-Q pushlog \
|
||||
--concurrency=5 --logfile=$LOGFILE -l INFO \
|
||||
--maxtasksperchild=500 -n pushlog.%h
|
||||
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import json
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
@ -57,36 +56,6 @@ def mock_buildapi_builds4h_url(activate_responses):
|
|||
content_type='application/json')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_buildapi_pending_missing1_url(activate_responses):
|
||||
tests_folder = os.path.dirname(os.path.dirname(__file__))
|
||||
path = os.path.join(
|
||||
tests_folder,
|
||||
"sample_data",
|
||||
"builds-pending-missing1.json"
|
||||
)
|
||||
with open(path) as f:
|
||||
mocked_content = f.read()
|
||||
responses.add(responses.GET, settings.BUILDAPI_PENDING_URL,
|
||||
body=mocked_content, status=200,
|
||||
content_type='application/json')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_buildapi_running_missing1_url(activate_responses):
|
||||
tests_folder = os.path.dirname(os.path.dirname(__file__))
|
||||
path = os.path.join(
|
||||
tests_folder,
|
||||
"sample_data",
|
||||
"builds-running-missing1.json"
|
||||
)
|
||||
with open(path) as f:
|
||||
mocked_content = f.read()
|
||||
responses.add(responses.GET, settings.BUILDAPI_RUNNING_URL,
|
||||
body=mocked_content, status=200,
|
||||
content_type='application/json')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_buildapi_builds4h_missing1_url(activate_responses):
|
||||
tests_folder = os.path.dirname(os.path.dirname(__file__))
|
||||
|
@ -228,83 +197,28 @@ def test_ingest_running_job_fields(jm,
|
|||
assert len(stored_obj) == 1
|
||||
assert stored_obj[0]["start_timestamp"] is not 0
|
||||
|
||||
#####################
|
||||
# MISSING RESULTSETS
|
||||
#####################
|
||||
|
||||
|
||||
def test_ingest_builds4h_jobs_1_missing_resultset(jm,
|
||||
sample_resultset, mock_buildapi_builds4h_missing1_url,
|
||||
mock_post_json, mock_log_parser, mock_get_resultset,
|
||||
activate_responses):
|
||||
mock_post_json, mock_log_parser, mock_get_resultset):
|
||||
"""
|
||||
Ensure the builds4h job with the missing resultset is queued for refetching
|
||||
Ensure the builds4h job with the missing resultset is not ingested
|
||||
"""
|
||||
etl_process = Builds4hJobsProcess()
|
||||
_do_missing_resultset_test(jm, etl_process)
|
||||
etl_process.run()
|
||||
|
||||
stored_obj = jm.get_dhub().execute(proc="jobs_test.selects.jobs")
|
||||
assert len(stored_obj) == 1
|
||||
|
||||
|
||||
def test_ingest_builds4h_jobs_missing_branch(jm,
|
||||
sample_resultset, mock_buildapi_builds4h_missing_branch_url,
|
||||
mock_post_json, mock_log_parser, mock_get_resultset):
|
||||
"""
|
||||
Ensure the builds4h job with the missing resultset is queued for refetching
|
||||
Ensure the builds4h job with the missing branch is not ingested
|
||||
"""
|
||||
etl_process = Builds4hJobsProcess()
|
||||
|
||||
etl_process.run()
|
||||
|
||||
stored_obj = jm.get_dhub().execute(proc="jobs_test.selects.jobs")
|
||||
|
||||
assert len(stored_obj) == 0
|
||||
|
||||
|
||||
def _do_missing_resultset_test(jm, etl_process):
|
||||
new_revision = "222222222222b344655ed7be9a408d2970a736c4"
|
||||
pushlog_content = json.dumps(
|
||||
{
|
||||
"pushes":
|
||||
{"33270": {
|
||||
"date": 1378288232,
|
||||
"changesets": [
|
||||
{
|
||||
"node": new_revision,
|
||||
"tags": [],
|
||||
"author": "John Doe <jdoe@mozilla.com>",
|
||||
"branch": "default",
|
||||
"desc": "bug 909264 - control characters"
|
||||
}
|
||||
],
|
||||
"user": "jdoe@mozilla.com"
|
||||
}}
|
||||
}
|
||||
)
|
||||
|
||||
# pending (and sometimes running) jobs only come to us with short revisions
|
||||
# but complete are 40, at least in our dest data.
|
||||
# So, for our tests, we may check json-pushes for either a long or a short
|
||||
# revision. We need to add both to ``responses`` here.
|
||||
for revision in [new_revision, new_revision[:12]]:
|
||||
rev_url = "https://hg.mozilla.org/mozilla-central/json-pushes/?full=1&version=2&changeset=" + revision
|
||||
responses.add(responses.GET, rev_url,
|
||||
body=pushlog_content, status=200,
|
||||
match_querystring=True,
|
||||
content_type='application/json')
|
||||
|
||||
etl_process.run()
|
||||
|
||||
stored_obj = jm.get_dhub().execute(proc="jobs_test.selects.jobs")
|
||||
|
||||
assert len(stored_obj) == 1
|
||||
|
||||
revisions_stored = jm.get_dhub().execute(
|
||||
proc="jobs_test.selects.revision_ids",
|
||||
return_type='tuple'
|
||||
)
|
||||
|
||||
assert len(revisions_stored) == 20
|
||||
was_stored = False
|
||||
for rs in revisions_stored:
|
||||
if str(rs['revision']) == new_revision:
|
||||
was_stored = True
|
||||
assert was_stored
|
||||
|
|
|
@ -5,8 +5,7 @@ import responses
|
|||
from django.conf import settings
|
||||
from django.core.cache import cache
|
||||
|
||||
from treeherder.etl.pushlog import (HgPushlogProcess,
|
||||
MissingHgPushlogProcess)
|
||||
from treeherder.etl.pushlog import HgPushlogProcess
|
||||
|
||||
|
||||
def test_ingest_hg_pushlog(jm, test_base_dir,
|
||||
|
@ -98,40 +97,6 @@ def test_ingest_hg_pushlog_already_stored(jm, test_base_dir,
|
|||
assert len(pushes_stored) == 2
|
||||
|
||||
|
||||
def test_ingest_hg_pushlog_not_found_in_json_pushes(jm, test_base_dir,
|
||||
test_repository, mock_post_json,
|
||||
activate_responses):
|
||||
"""
|
||||
Ingest a pushlog that is not found in json-pushes. So we ingest a
|
||||
resultset that is "onhold"
|
||||
|
||||
"""
|
||||
|
||||
pushlog_fake_url = "http://www.thisismypushlog.com"
|
||||
responses.add(responses.GET, pushlog_fake_url,
|
||||
body="foo", status=404,
|
||||
content_type='application/json')
|
||||
|
||||
process = MissingHgPushlogProcess()
|
||||
|
||||
process.run(pushlog_fake_url, jm.project, "123456789012")
|
||||
|
||||
pushes_stored = jm.get_dhub().execute(
|
||||
proc="jobs_test.selects.result_sets",
|
||||
return_type='tuple'
|
||||
)
|
||||
|
||||
assert len(pushes_stored) == 1
|
||||
assert pushes_stored[0]['active_status'] == "onhold"
|
||||
|
||||
revisions_stored = jm.get_dhub().execute(
|
||||
proc="jobs_test.selects.revision_ids",
|
||||
return_type='tuple'
|
||||
)
|
||||
|
||||
assert len(revisions_stored) == 1
|
||||
|
||||
|
||||
def test_ingest_hg_pushlog_cache_last_push(jm, test_repository,
|
||||
test_base_dir, mock_post_json,
|
||||
activate_responses):
|
||||
|
|
|
@ -1,12 +0,0 @@
|
|||
{
|
||||
"pending":{
|
||||
"test_treeherder_jobs": {
|
||||
"45f8637cb9f7": [
|
||||
{"submitted_at": 1369304814, "id": 24575179, "buildername": "WINNT 6.2 try debug test mochitest-4"}
|
||||
],
|
||||
"222222222222": [
|
||||
{"submitted_at": 1369304820, "id": 24575180, "buildername": "WINNT 6.2 try debug test mochitest-4"}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,36 +0,0 @@
|
|||
{
|
||||
"running": {
|
||||
"test_treeherder_jobs": {
|
||||
"45f8637cb9f7": [
|
||||
{
|
||||
"submitted_at": 1369231311,
|
||||
"buildername": "WINNT 5.2 profiling build",
|
||||
"start_time": 1369231311,
|
||||
"number": 3,
|
||||
"claimed_by_name": "buildbot-master66.srv.releng.usw2.mozilla.com:/builds/buildbot/build1/master",
|
||||
"request_ids": [
|
||||
24526180
|
||||
],
|
||||
"last_heartbeat": 1369231939,
|
||||
"id": 24767134,
|
||||
"revision": "45f8637cb9f78f19cb8463ff174e81756805d8cf"
|
||||
}
|
||||
],
|
||||
"222222222222": [
|
||||
{
|
||||
"submitted_at": 1369231312,
|
||||
"buildername": "WINNT 5.2 profiling build",
|
||||
"start_time": 1369231312,
|
||||
"number": 3,
|
||||
"claimed_by_name": "buildbot-master66.srv.releng.usw2.mozilla.com:/builds/buildbot/build1/master",
|
||||
"request_ids": [
|
||||
24526181
|
||||
],
|
||||
"last_heartbeat": 1369231940,
|
||||
"id": 24767134,
|
||||
"revision": "222222222222b344655ed7be9a408d2970a736c4"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
|
@ -202,7 +202,6 @@ CELERY_QUEUES = [
|
|||
Queue('error_summary', Exchange('default'), routing_key='error_summary'),
|
||||
Queue('publish_to_pulse', Exchange('default'), routing_key='publish_to_pulse'),
|
||||
Queue('pushlog', Exchange('default'), routing_key='pushlog'),
|
||||
Queue('fetch_missing_push_logs', Exchange('default'), routing_key='fetch_missing_push_logs'),
|
||||
Queue('buildapi_pending', Exchange('default'), routing_key='buildapi_pending'),
|
||||
Queue('buildapi_running', Exchange('default'), routing_key='buildapi_running'),
|
||||
Queue('buildapi_4hr', Exchange('default'), routing_key='buildapi_4hr'),
|
||||
|
|
|
@ -82,7 +82,6 @@ class Builds4hTransformerMixin(object):
|
|||
our restful api
|
||||
"""
|
||||
revisions = defaultdict(list)
|
||||
missing_resultsets = defaultdict(set)
|
||||
|
||||
valid_projects = set(x.project for x in Datasource.objects.cached())
|
||||
|
||||
|
@ -120,13 +119,13 @@ class Builds4hTransformerMixin(object):
|
|||
continue
|
||||
if common.should_skip_revision(prop['revision'], revision_filter):
|
||||
continue
|
||||
resultset = common.get_resultset(project,
|
||||
revisions_lookup,
|
||||
prop['revision'],
|
||||
missing_resultsets,
|
||||
logger)
|
||||
except KeyError:
|
||||
# There was no matching resultset, skip the job.
|
||||
continue
|
||||
|
||||
try:
|
||||
resultset = revisions_lookup[project][prop['revision']]
|
||||
except KeyError:
|
||||
logger.warning("skipping builds-4hr job %s since %s revision %s not yet ingested", build['id'], project, prop['revision'])
|
||||
continue
|
||||
|
||||
# We record the id here rather than at the start of the loop, since we
|
||||
|
@ -243,9 +242,6 @@ class Builds4hTransformerMixin(object):
|
|||
th_job = th_collections[project].get_job(treeherder_data)
|
||||
th_collections[project].add(th_job)
|
||||
|
||||
if missing_resultsets and not revision_filter:
|
||||
common.fetch_missing_resultsets("builds4h", missing_resultsets, logger)
|
||||
|
||||
num_new_jobs = len(job_ids_seen_now.difference(job_ids_seen_last_time))
|
||||
logger.info("Imported %d completed jobs, skipped %d previously seen",
|
||||
num_new_jobs, len(job_ids_seen_now) - num_new_jobs)
|
||||
|
@ -263,7 +259,6 @@ class PendingRunningTransformerMixin(object):
|
|||
"""
|
||||
valid_projects = set(x.project for x in Datasource.objects.cached())
|
||||
revision_dict = defaultdict(list)
|
||||
missing_resultsets = defaultdict(set)
|
||||
|
||||
# loop to catch all the revisions
|
||||
for project, revisions in data[source].iteritems():
|
||||
|
@ -292,13 +287,9 @@ class PendingRunningTransformerMixin(object):
|
|||
continue
|
||||
|
||||
try:
|
||||
resultset = common.get_resultset(project,
|
||||
revisions_lookup,
|
||||
revision,
|
||||
missing_resultsets,
|
||||
logger)
|
||||
resultset = revisions_lookup[project][revision]
|
||||
except KeyError:
|
||||
# There was no matching resultset, skip the job.
|
||||
logger.warning("skipping jobs since %s revision %s not yet ingested", project, revision)
|
||||
continue
|
||||
|
||||
# using project and revision form the revision lookups
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
import hashlib
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
|
||||
import requests
|
||||
from django.conf import settings
|
||||
|
@ -146,81 +145,3 @@ def get_guid_root(guid):
|
|||
if "_" in str(guid):
|
||||
return str(guid).split("_", 1)[0]
|
||||
return guid
|
||||
|
||||
|
||||
def fetch_missing_resultsets(source, missing_resultsets, logger):
|
||||
"""
|
||||
Schedules refetch of resultsets based on ``missing_revisions``
|
||||
"""
|
||||
for k, v in missing_resultsets.iteritems():
|
||||
missing_resultsets[k] = list(v)
|
||||
|
||||
logger.warn(
|
||||
"Found {0} jobs with missing resultsets. Scheduling re-fetch: {1}".format(
|
||||
source,
|
||||
missing_resultsets
|
||||
)
|
||||
)
|
||||
from treeherder.etl.tasks.cleanup_tasks import fetch_missing_push_logs
|
||||
fetch_missing_push_logs.apply_async(
|
||||
args=[missing_resultsets],
|
||||
routing_key="fetch_missing_push_logs")
|
||||
|
||||
|
||||
def get_resultset(project, revisions_lookup, revision, missing_resultsets, logger):
|
||||
"""
|
||||
Get the resultset out of the revisions_lookup for the given revision.
|
||||
|
||||
This is a little complex due to our attempts to get missing resultsets
|
||||
in case we see jobs that, for one reason or another, we didn't get the
|
||||
resultset from json-pushes.
|
||||
|
||||
This may raise a KeyError if the project or revision isn't found in the
|
||||
lookup.. This signals that the job should be skipped
|
||||
"""
|
||||
|
||||
resultset_lookup = revisions_lookup[project]
|
||||
try:
|
||||
resultset = resultset_lookup[revision]
|
||||
|
||||
# we can ingest resultsets that are not active for various
|
||||
# reasons. One would be that the data from pending/running/
|
||||
# builds4hr may have a bad revision (from the wrong repo).
|
||||
# in this case, we ingest the resultset as inactive so we
|
||||
# don't keep re-trying to find it when we hit jobs like this.
|
||||
# Or, the resultset could be inactive for other reasons.
|
||||
# Either way, we don't want to ingest jobs for it.
|
||||
if resultset.get("active_status", "active") != "active":
|
||||
logger.info(("Skipping job for non-active "
|
||||
"resultset/revision: {0}").format(
|
||||
revision))
|
||||
|
||||
except KeyError as ex:
|
||||
# we don't have the resultset for this build/job yet
|
||||
# we need to queue fetching that resultset
|
||||
if revision not in ["Unknown", None]:
|
||||
missing_resultsets[project].add(revision)
|
||||
raise ex
|
||||
|
||||
return resultset
|
||||
|
||||
|
||||
def get_not_found_onhold_push(url, revision):
|
||||
return {
|
||||
"pushes": {
|
||||
"00001": {
|
||||
"date": int(time.time()),
|
||||
"changesets": [
|
||||
{
|
||||
"node": revision,
|
||||
"tags": [],
|
||||
"author": "Unknown",
|
||||
"branch": "default",
|
||||
"desc": "Pushlog not found at {0}".format(url)
|
||||
}
|
||||
],
|
||||
"user": "Unknown",
|
||||
"active_status": "onhold"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,8 +6,7 @@ from django.core.cache import cache
|
|||
from treeherder.client import TreeherderResultSetCollection
|
||||
from treeherder.etl import th_publisher
|
||||
from treeherder.etl.common import (fetch_json,
|
||||
generate_revision_hash,
|
||||
get_not_found_onhold_push)
|
||||
generate_revision_hash)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -135,66 +134,3 @@ class HgPushlogProcess(HgPushlogTransformerMixin):
|
|||
cache.set("{0}:last_push_id".format(repository), last_push_id)
|
||||
|
||||
return top_revision
|
||||
|
||||
|
||||
class MissingHgPushlogProcess(HgPushlogTransformerMixin):
|
||||
|
||||
def extract(self, url, revision):
|
||||
logger.info("extracting missing resultsets: {0}".format(url))
|
||||
try:
|
||||
return fetch_json(url)
|
||||
except requests.exceptions.HTTPError as e:
|
||||
status_code = e.response.status_code
|
||||
if status_code == 404:
|
||||
# we will sometimes get here because builds4hr/pending/running have a
|
||||
# job with a resultset that json-pushes doesn't know about. So far
|
||||
# I have only found this to be the case when it uses a revision from
|
||||
# the wrong repo. For example: mozilla-central, but l10n. The l10n
|
||||
# is a separate repo, but buildbot shows it as the same. So we
|
||||
# create this dummy resultset with ``active_status`` of ``onhold``.
|
||||
#
|
||||
# The effect of this is that we won't keep trying to re-fetch
|
||||
# the bogus pushlog, but the jobs are (correctly) not shown in the
|
||||
# UI, since they're bad data.
|
||||
logger.warn(("no pushlog in json-pushes. generating a dummy"
|
||||
" onhold placeholder: {0}").format(url))
|
||||
|
||||
# we want to make a "dummy" resultset that is "onhold",
|
||||
# because json-pushes doesn't know about it.
|
||||
# This is, in effect, what TBPL does.
|
||||
# These won't show in the UI, because they only fetch "active"
|
||||
# resultsets
|
||||
return get_not_found_onhold_push(url, revision)
|
||||
|
||||
logger.warning("HTTPError %s fetching: %s", status_code, url)
|
||||
raise
|
||||
|
||||
def run(self, source_url, repository, revision):
|
||||
|
||||
try:
|
||||
extracted_content = self.extract(source_url, revision)
|
||||
|
||||
if extracted_content:
|
||||
|
||||
transformed = self.transform(
|
||||
extracted_content['pushes'],
|
||||
repository
|
||||
)
|
||||
|
||||
for project, coll in transformed.iteritems():
|
||||
logger.info("loading missing resultsets for {0}: {1}".format(
|
||||
project,
|
||||
coll.to_json()))
|
||||
|
||||
th_publisher.post_treeherder_collections(transformed)
|
||||
logger.info("done loading missing resultsets for {0}".format(repository))
|
||||
else:
|
||||
assert extracted_content, (
|
||||
"Got no content response for missing resultsets: {0}".format(
|
||||
source_url)
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("error loading missing resultsets: {0}".format(
|
||||
source_url
|
||||
))
|
||||
raise
|
||||
|
|
|
@ -1,41 +0,0 @@
|
|||
import logging
|
||||
import urllib
|
||||
|
||||
from celery import task
|
||||
|
||||
from treeherder.etl.pushlog import MissingHgPushlogProcess
|
||||
from treeherder.model.models import Repository
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@task(name='fetch-missing-push-logs')
|
||||
def fetch_missing_push_logs(missing_pushlogs):
|
||||
"""
|
||||
Run several fetch_hg_push_log subtasks, one per repository
|
||||
"""
|
||||
for repo in Repository.objects.filter(dvcs_type='hg',
|
||||
active_status='active'):
|
||||
if repo.name in missing_pushlogs:
|
||||
# we must get them one at a time, because if ANY are missing
|
||||
# from json-pushes, it'll return a 404 for the group.
|
||||
for resultset in missing_pushlogs[repo.name]:
|
||||
fetch_missing_hg_push_logs.apply_async(
|
||||
args=(repo.name, repo.url, resultset),
|
||||
routing_key='fetch_missing_push_logs')
|
||||
|
||||
|
||||
@task(name='fetch-missing-hg-push-logs', time_limit=3 * 60)
|
||||
def fetch_missing_hg_push_logs(repo_name, repo_url, resultset):
|
||||
"""
|
||||
Run a HgPushlog etl process
|
||||
|
||||
``revisions`` is a list of changeset values truncated to 12 chars.
|
||||
"""
|
||||
process = MissingHgPushlogProcess()
|
||||
|
||||
changesetParam = urllib.urlencode({"changeset": resultset}, True)
|
||||
url_str = repo_url + '/json-pushes/?full=1&version=2&' + changesetParam
|
||||
|
||||
logger.info("fetching missing resultsets: {0}".format(url_str))
|
||||
process.run(url_str, repo_name, resultset)
|
Загрузка…
Ссылка в новой задаче