Bug 1312809 - Support ingesting last N pushes from pushlog

As a new contributor to Treeherder, I was confused how to get
Treeherder to ingest several pushes. The celery worker appeared to
only ingest the last 10 pushes.

This commit enhances the "ingest_push" command to allow ingesting
the last N pushes. I've used this to ingest the last 100 pushes
to seed the database with sufficient pushlog data.
This commit is contained in:
Gregory Szorc 2016-10-25 11:58:46 -07:00 коммит произвёл Gregory Szorc
Родитель d2d5404100
Коммит b83ab0eb46
3 изменённых файлов: 61 добавлений и 16 удалений

Просмотреть файл

@ -114,6 +114,19 @@ talos jobs for a particular push, try:
vagrant ~/treeherder$ ./manage.py ingest_push --filter-job-group T mozilla-inbound 63f8a47cfdf
Ingesting a range of pushes
---------------------------
It is also possible to ingest the last N pushes for a repository:
.. code-block:: bash
vagrant ~/treeherder$ ./manage.py ingest_push mozilla-central --last-n-pushes 100
In this mode, only the pushlog data will be ingested: additional results
associated with the pushes will not. This mode is useful to seed pushes so
they are visible on the web interface and so you can easily copy and paste
changesets from the web interface into subsequent ``ingest_push`` commands.
.. _A-Team Bootcamp: https://ateam-bootcamp.readthedocs.io
.. _Git: https://git-scm.com

Просмотреть файл

@ -1,3 +1,4 @@
import logging
from cProfile import Profile
from django.conf import settings
@ -7,9 +8,12 @@ from django.core.management.base import (BaseCommand,
from treeherder.etl.buildapi import (Builds4hJobsProcess,
PendingJobsProcess,
RunningJobsProcess)
from treeherder.etl.pushlog import HgPushlogProcess
from treeherder.etl.pushlog import (HgPushlogProcess,
last_push_id_from_server)
from treeherder.model.models import Repository
logger = logging.getLogger(__name__)
class Command(BaseCommand):
@ -24,17 +28,31 @@ class Command(BaseCommand):
parser.add_argument('--filter-job-group',
help="Only process jobs in specified group symbol "
"(e.g. 'T')")
parser.add_argument('--last-n-pushes', type=int,
help='fetch the last N pushes from the repository')
parser.add_argument('project', help='repository to query')
parser.add_argument('changeset', help='changeset to import')
parser.add_argument('changeset', nargs='?', help='changeset to import')
def _handle(self, *args, **options):
project = options['project']
changeset = options['changeset']
if not options['last_n_pushes'] and not changeset:
raise CommandError('must specify --last-n-pushes or a positional '
'changeset argument')
# get reference to repo
repo = Repository.objects.get(name=project, active_status='active')
if options['last_n_pushes']:
last_push_id = last_push_id_from_server(repo)
fetch_push_id = max(1, last_push_id - options['last_n_pushes'])
logger.info('last server push id: %d; fetching push %d and newer'
% (last_push_id, fetch_push_id))
else:
fetch_push_id = None
# make sure all tasks are run synchronously / immediately
settings.CELERY_ALWAYS_EAGER = True
@ -45,17 +63,23 @@ class Command(BaseCommand):
process = HgPushlogProcess()
# Use the actual push SHA, in case the changeset specified was a tag
# or branch name (eg tip). HgPushlogProcess returns the full SHA.
push_sha = process.run(pushlog_url, project, changeset=changeset)
push_sha = process.run(pushlog_url, project, changeset=changeset,
last_push_id=fetch_push_id)
Builds4hJobsProcess().run(project_filter=project,
revision_filter=push_sha,
job_group_filter=options['filter_job_group'])
PendingJobsProcess().run(project_filter=project,
revision_filter=push_sha,
job_group_filter=options['filter_job_group'])
RunningJobsProcess().run(project_filter=project,
revision_filter=push_sha,
job_group_filter=options['filter_job_group'])
# Only perform additional processing if fetching a single changeset
# because we only have the sha1 if the tip-most push in "last N pushes"
# mode and can't filter appropriately.
if not fetch_push_id:
group_filter = options['filter_job_group']
Builds4hJobsProcess().run(project_filter=project,
revision_filter=push_sha,
job_group_filter=group_filter)
PendingJobsProcess().run(project_filter=project,
revision_filter=push_sha,
job_group_filter=group_filter)
RunningJobsProcess().run(project_filter=project,
revision_filter=push_sha,
job_group_filter=group_filter)
def handle(self, *args, **options):

Просмотреть файл

@ -14,6 +14,13 @@ from treeherder.model.derived.jobs import JobsModel
logger = logging.getLogger(__name__)
def last_push_id_from_server(repo):
"""Obtain the last push ID from a ``Repository`` instance."""
url = '%s/json-pushes/?version=2' % repo.url
data = fetch_json(url)
return data['lastpushid']
class HgPushlogTransformerMixin(object):
def transform(self, pushlog, repository):
@ -83,11 +90,12 @@ class HgPushlogProcess(HgPushlogTransformerMixin):
logger.warning("HTTPError %s fetching: %s", e.response.status_code, url)
raise
def run(self, source_url, repository, changeset=None):
def run(self, source_url, repository, changeset=None, last_push_id=None):
if not last_push_id:
# get the last object seen from cache. this will
# reduce the number of pushes processed every time
last_push_id = cache.get("{0}:last_push_id".format(repository))
# get the last object seen from cache. this will
# reduce the number of pushes processed every time
last_push_id = cache.get("{0}:last_push_id".format(repository))
if not changeset and last_push_id:
startid_url = "{}&startID={}".format(source_url, last_push_id)
logger.info("Extracted last push for '%s', '%s', from cache, "