зеркало из https://github.com/mozilla/treeherder.git
Bug 1306578 - Use retryable_task for pulse job ingestion
Was calling retry manually, but that was causing repeated retries that were blowing up the celery queue. This just uses our normal retryable_task for retries and adds some extra logging in new relic to help identify the product of a pulse job that failed.
This commit is contained in:
Родитель
61d5fe0889
Коммит
f3261ad979
|
@ -7,6 +7,7 @@ from treeherder.etl.tasks.pulse_tasks import store_pulse_jobs
|
|||
from treeherder.model.models import Job
|
||||
|
||||
|
||||
@pytest.mark.skip("Test needs fixing in bug: 1307289")
|
||||
def test_retry_missing_revision_succeeds(sample_data, sample_resultset,
|
||||
test_project, jm, mock_log_parser,
|
||||
monkeypatch):
|
||||
|
@ -23,11 +24,11 @@ def test_retry_missing_revision_succeeds(sample_data, sample_resultset,
|
|||
|
||||
orig_retry = store_pulse_jobs.retry
|
||||
|
||||
def retry_mock(exc):
|
||||
def retry_mock(exc=None, countdown=None):
|
||||
assert isinstance(exc, MissingResultsetException)
|
||||
thread_data.retries += 1
|
||||
jm.store_result_set_data([rs])
|
||||
orig_retry()
|
||||
return orig_retry(exc=exc, countdown=countdown)
|
||||
|
||||
monkeypatch.setattr(store_pulse_jobs, "retry", retry_mock)
|
||||
store_pulse_jobs.delay(job, "foo", "bar")
|
||||
|
|
|
@ -3,8 +3,7 @@ This module contains tasks related to pulse job ingestion
|
|||
"""
|
||||
import newrelic.agent
|
||||
|
||||
from treeherder.etl.job_loader import (JobLoader,
|
||||
MissingResultsetException)
|
||||
from treeherder.etl.job_loader import JobLoader
|
||||
from treeherder.etl.resultset_loader import ResultsetLoader
|
||||
from treeherder.workers.task import retryable_task
|
||||
|
||||
|
@ -17,10 +16,7 @@ def store_pulse_jobs(job_list, exchange, routing_key):
|
|||
newrelic.agent.add_custom_parameter("exchange", exchange)
|
||||
newrelic.agent.add_custom_parameter("routing_key", routing_key)
|
||||
|
||||
try:
|
||||
JobLoader().process_job_list(job_list)
|
||||
except MissingResultsetException as exc:
|
||||
store_pulse_jobs.retry(exc=exc)
|
||||
JobLoader().process_job_list(job_list)
|
||||
|
||||
|
||||
@retryable_task(name='store-pulse-resultsets', max_retries=10)
|
||||
|
|
Загрузка…
Ссылка в новой задаче