Bug 1306844 - Simplify pushlog transformation during ingestion

Previously `pushes` was iterated to create a transformed pushes list,
which was iterated again to submit each push within. Now the list of
pushes is only iterated once, with just one push being transformed and
stored at a time.

In addition, each push is stored by calling `store_push()` directly, to
save having to pass a single element list to `store_result_set_data()`.
This commit is contained in:
Ed Morley 2017-04-20 16:09:28 +01:00
Родитель 20ce1497bd
Коммит f3a286709d
2 изменённых файлов: 36 добавлений и 58 удалений

Просмотреть файл

@ -8,7 +8,7 @@ from django.core.cache import cache
from treeherder.etl.common import (CollectionNotStoredException,
fetch_json,
generate_revision_hash)
from treeherder.etl.resultset import store_result_set_data
from treeherder.etl.resultset import store_push
from treeherder.model.models import Repository
logger = logging.getLogger(__name__)
@ -21,55 +21,7 @@ def last_push_id_from_server(repo):
return data['lastpushid']
class HgPushlogTransformerMixin(object):
def transform(self, pushlog):
pushes = []
for push in pushlog.values():
if not push['changesets']:
# If a pushlog contains hidden changesets (changesets that are
# obsolete) then the call to json-pushes will return a push
# with no changesets. This was changed in bug 1286426.
# For us, if `changesets` is empty, we will not be able to get
# a revision, which is required for a resultset. So we
# need to just skip the push.
continue
result_set = dict()
result_set['push_timestamp'] = push['date']
result_set['revisions'] = []
# Author of the push/resultset
result_set['author'] = push['user']
# TODO: Remove this with Bug 1257602 is addressed
rev_hash_components = []
# iterate over the revisions
# we only want to ingest the last 200 revisions.
for change in push['changesets'][-200:]:
revision = {
'revision': change['node'],
'author': change['author'],
'comment': change['desc'],
}
rev_hash_components.append(change['node'])
rev_hash_components.append(change['branch'])
# append the revision to the push
result_set['revisions'].append(revision)
result_set['revision_hash'] = generate_revision_hash(rev_hash_components)
result_set['revision'] = result_set["revisions"][-1]["revision"]
pushes.append(result_set)
return pushes
class HgPushlogProcess(HgPushlogTransformerMixin):
class HgPushlogProcess(object):
# For more info on Mercurial Pushes, see:
# https://mozilla-version-control-tools.readthedocs.io/en/latest/hgmo/pushlog.html
@ -80,8 +32,30 @@ class HgPushlogProcess(HgPushlogTransformerMixin):
logger.warning("HTTPError %s fetching: %s", e.response.status_code, url)
raise
def transform_push(self, push):
commits = []
# TODO: Remove this when bug 1257602 is addressed
rev_hash_components = []
# we only want to ingest the last 200 commits for each push,
# to protect against the 5000+ commit merges on release day uplift.
for commit in push['changesets'][-200:]:
commits.append({
'revision': commit['node'],
'author': commit['author'],
'comment': commit['desc'],
})
rev_hash_components.append(commit['node'])
rev_hash_components.append(commit['branch'])
return {
'revision': commits[-1]["revision"],
'revision_hash': generate_revision_hash(rev_hash_components),
'author': push['user'],
'push_timestamp': push['date'],
'revisions': commits,
}
def run(self, source_url, repository_name, changeset=None, last_push_id=None):
print repository_name
if not last_push_id:
# get the last object seen from cache. this will
# reduce the number of pushes processed every time
@ -125,23 +99,27 @@ class HgPushlogProcess(HgPushlogTransformerMixin):
"getting all pushes" % repository_name)
extracted_content = self.extract(source_url)
# ``pushes`` could be empty if there are no new ones since we last
# fetched
pushes = extracted_content['pushes']
# `pushes` could be empty if there are no new ones since we last fetched
if not pushes:
return None
last_push_id = max(map(lambda x: int(x), pushes.keys()))
last_push = pushes[str(last_push_id)]
top_revision = last_push["changesets"][-1]["node"]
transformed = self.transform(pushes)
errors = []
repository = Repository.objects.get(name=repository_name)
for push in transformed:
for push in pushes.values():
if not push['changesets']:
# A push without commits means it was marked as obsolete (see bug 1286426).
# Without them it's not possible to calculate the push revision required for ingestion.
continue
try:
store_result_set_data(repository, [push])
store_push(repository, self.transform_push(push))
except Exception:
newrelic.agent.record_exception()
errors.append({

Просмотреть файл

@ -9,7 +9,7 @@ from treeherder.model.models import (Commit,
logger = logging.getLogger(__name__)
def _store_push(repository, result_set):
def store_push(repository, result_set):
result_set_revision = result_set.get('revision')
if not result_set.get('revision'):
raise ValueError("Result set must have a revision "
@ -67,4 +67,4 @@ def store_result_set_data(repository, result_sets):
return
for result_set in result_sets:
_store_push(repository, result_set)
store_push(repository, result_set)