diff --git a/.dependabot/config.yml b/.dependabot/config.yml index 7753fc9dc..798d28afe 100644 --- a/.dependabot/config.yml +++ b/.dependabot/config.yml @@ -1,26 +1,11 @@ # https://dependabot.com/blog/introducing-config-files/ version: 1 update_configs: - - package_manager: "python" - directory: "/requirements" - update_schedule: "live" + - package_manager: 'python' + directory: '/requirements' + update_schedule: 'live' default_reviewers: - - "camd" + - 'camd' default_labels: - - "dependencies" - - "python" - ignored_updates: - # Celery/kombu/amqp/billiard must be updated all at the same time, - # and the update is blocked on resolving bug 1337717. - - match: - dependency_name: "celery" - version_requirement: ">=4" - - match: - dependency_name: "kombu" - version_requirement: ">=4" - - match: - dependency_name: "amqp" - version_requirement: ">=2" - - match: - dependency_name: "billiard" - version_requirement: ">=3.4" + - 'dependencies' + - 'python' diff --git a/lints/queuelint.py b/lints/queuelint.py index 4dc3f942e..5e3b70f3f 100755 --- a/lints/queuelint.py +++ b/lints/queuelint.py @@ -20,7 +20,7 @@ settings_queues = set() queues_list = None for item in code.body: - if isinstance(item, ast.Assign) and item.targets[0].id == "CELERY_QUEUES": + if isinstance(item, ast.Assign) and item.targets[0].id == "CELERY_TASK_QUEUES": queues_list = item.value if queues_list is None: diff --git a/requirements/common.txt b/requirements/common.txt index b5894b8a0..cc661a51a 100644 --- a/requirements/common.txt +++ b/requirements/common.txt @@ -42,11 +42,9 @@ Brotli==1.0.7 \ Django==2.1.7 \ --hash=sha256:275bec66fd2588dd517ada59b8bfb23d4a9abc5a362349139ddda3c7ff6f5ade -celery==3.1.26.post2 \ - --hash=sha256:60211897aee321266ff043fe2b33eaac825dfe9f46843cf964fc97507a186334 \ - --hash=sha256:5493e172ae817b81ba7d09443ada114886765a8ce02f16a56e6fac68d953a9b2 +celery==4.3.0rc1 --hash=sha256:62cdf98af78278202b8a3ba1b8215c87ab420eae7eebf0b7ba27b16ebc9f148e -kombu==3.0.37 --hash=sha256:7ceab743e3e974f3e5736082e8cc514c009e254e646d6167342e0e192aee81a6 +kombu==4.3.0 --hash=sha256:7a2cbed551103db9a4e2efafe9b63222e012a61a18a881160ad797b9d4e1d0a1 simplejson==3.16.0 \ --hash=sha256:6c3258ffff58712818a233b9737fe4be943d306c40cf63d14ddc82ba563f483a \ @@ -79,14 +77,16 @@ mysqlclient==1.4.2.post1 \ --hash=sha256:f257d250f2675d0ef99bd318906f3cfc05cef4a2f385ea695ff32a3f04b9f9a7 # Required by celery -billiard==3.3.0.23 --hash=sha256:692a2a5a55ee39a42bcb7557930e2541da85df9ea81c6e24827f63b80cd39d0b +billiard==3.6.0.0 --hash=sha256:756bf323f250db8bf88462cd042c992ba60d8f5e07fc5636c24ba7d6f4261d84 pytz==2018.9 \ --hash=sha256:32b0891edff07e28efe91284ed9c31e123d84bea3fd98e1f72be2508f43ef8d9 \ --hash=sha256:d5f05e487007e29e03409f9398d074e158d920d36eb82eaf66fb1136b0c5374c # Required by kombu -amqp==1.4.9 --hash=sha256:e0ed0ce6b8ffe5690a2e856c7908dc557e0e605283d6885dd1361d79f2928908 -anyjson==0.3.3 --hash=sha256:37812d863c9ad3e35c0734c42e0bf0320ce8c3bed82cd20ad54cb34d158157ba +amqp==2.4.1 --hash=sha256:16056c952e8029ce8db097edf0d7c2fe2ba9de15d30ba08aee2c5221273d8e23 + +# Required by amqp +vine==1.2.0 --hash=sha256:3cd505dcf980223cfaf13423d371f2e7ff99247e38d5985a01ec8264e4f2aca1 # Required by mozlog mozterm==1.0.0 \ diff --git a/runtests.sh b/runtests.sh index 8778ab8d9..3660fb676 100755 --- a/runtests.sh +++ b/runtests.sh @@ -6,7 +6,7 @@ set -euo pipefail echo "Running pip check" pip check -echo "Checking CELERY_QUEUES matches Procfile" +echo "Checking CELERY_TASK_QUEUES matches Procfile" python -bb ./lints/queuelint.py echo "Running flake8" diff --git a/tests/conftest.py b/tests/conftest.py index dab1dd6a9..272c5d696 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -287,7 +287,7 @@ def pulse_connection(): This is a non-lazy mirror of our Pulse service's build_connection as explained in: https://bugzilla.mozilla.org/show_bug.cgi?id=1484196 """ - return kombu.Connection(settings.BROKER_URL) + return kombu.Connection(settings.CELERY_BROKER_URL) @pytest.fixture diff --git a/tests/settings.py b/tests/settings.py index de94d43a9..bb01cb18f 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -6,8 +6,8 @@ KEY_PREFIX = 'test' TREEHERDER_TEST_REPOSITORY_NAME = 'test_treeherder_jobs' # this makes celery calls synchronous, useful for unit testing -CELERY_ALWAYS_EAGER = True -CELERY_EAGER_PROPAGATES_EXCEPTIONS = True +CELERY_TASK_ALWAYS_EAGER = True +CELERY_TASK_EAGER_PROPAGATES = True # Make WhiteNoise look for static assets inside registered Django apps, rather # than only inside the generated staticfiles directory. This means we don't diff --git a/treeherder/__init__.py b/treeherder/__init__.py index b6fc8176d..15d7c5085 100644 --- a/treeherder/__init__.py +++ b/treeherder/__init__.py @@ -1,3 +1,5 @@ # This will make sure the app is always imported when # Django starts so that shared_task will use this app. -from .celery import app as celery_app # noqa +from .celery import app as celery_app + +__all__ = ('celery_app',) diff --git a/treeherder/celery.py b/treeherder/celery.py index 36c7b1868..8e301816a 100644 --- a/treeherder/celery.py +++ b/treeherder/celery.py @@ -1,14 +1,17 @@ import os from celery import Celery -from django.conf import settings # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'treeherder.config.settings') app = Celery('treeherder') -# Using a string here means the worker will not have to -# pickle the object when using Windows. -app.config_from_object('django.conf:settings') -app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) +# Using a string here means the worker doesn't have to serialize +# the configuration object to child processes. +# - namespace='CELERY' means all celery-related configuration keys +# should have a `CELERY_` prefix. +app.config_from_object('django.conf:settings', namespace='CELERY') + +# Load task modules from all registered Django app configs. +app.autodiscover_tasks() diff --git a/treeherder/config/settings.py b/treeherder/config/settings.py index 00ee19560..b158030e4 100644 --- a/treeherder/config/settings.py +++ b/treeherder/config/settings.py @@ -263,7 +263,13 @@ AUTH0_DOMAIN = env('AUTH0_DOMAIN', default="auth.mozilla.auth0.com") AUTH0_CLIENTID = env('AUTH0_CLIENTID', default="q8fZZFfGEmSB2c5uSI8hOkKdDGXnlo5z") # Celery -CELERY_QUEUES = [ + +# TODO: Replace the use of different log parser queues for failures vs not with the +# RabbitMQ priority feature (since the idea behind separate queues was only to ensure +# failures are dealt with first if there is a backlog). After that it should be possible +# to simplify the queue configuration, by using the recommended CELERY_TASK_ROUTES instead: +# http://docs.celeryproject.org/en/latest/userguide/routing.html#automatic-routing +CELERY_TASK_QUEUES = [ Queue('default', Exchange('default'), routing_key='default'), Queue('log_parser', Exchange('default'), routing_key='log_parser.normal'), Queue('log_parser_fail', Exchange('default'), routing_key='log_parser.failures'), @@ -276,39 +282,36 @@ CELERY_QUEUES = [ Queue('seta_analyze_failures', Exchange('default'), routing_key='seta_analyze_failures'), ] +# Force all queues to be explicitly listed in `CELERY_TASK_QUEUES` to help prevent typos +# and so that `lints/queuelint.py` can check a corresponding worker exists in `Procfile`. +CELERY_TASK_CREATE_MISSING_QUEUES = False + # Celery broker setup -BROKER_URL = env('BROKER_URL') +CELERY_BROKER_URL = env('BROKER_URL') # Force Celery to use TLS when appropriate (ie if not localhost), -# rather than relying on `BROKER_URL` having `amqps://` or `?ssl=` set. +# rather than relying on `CELERY_BROKER_URL` having `amqps://` or `?ssl=` set. # This is required since CloudAMQP's automatically defined URL uses neither. -if connection_should_use_tls(BROKER_URL): - BROKER_USE_SSL = True +if connection_should_use_tls(CELERY_BROKER_URL): + CELERY_BROKER_USE_SSL = True # Recommended by CloudAMQP: # https://www.cloudamqp.com/docs/celery.html -BROKER_CONNECTION_TIMEOUT = 30 -BROKER_HEARTBEAT = None -CELERY_ACCEPT_CONTENT = ['json'] -CELERY_EVENT_QUEUE_EXPIRES = 60 -CELERY_IGNORE_RESULT = True -CELERY_RESULT_BACKEND = None -CELERY_RESULT_SERIALIZER = 'json' -CELERY_SEND_EVENTS = False -CELERY_TASK_SERIALIZER = 'json' +# Raise timeout from default of 4s, in case of Linux DNS timeouts etc. +CELERY_BROKER_CONNECTION_TIMEOUT = 30 +# Disable heartbeats since CloudAMQP uses TCP keep-alive instead. +CELERY_BROKER_HEARTBEAT = None # default value when no task routing info is specified -CELERY_DEFAULT_QUEUE = 'default' -CELERY_DEFAULT_EXCHANGE_TYPE = 'direct' -CELERY_DEFAULT_ROUTING_KEY = 'default' +CELERY_TASK_DEFAULT_QUEUE = 'default' # Default celery time limits in seconds. The gap between the soft and hard time limit # is to give the New Relic agent time to report the `SoftTimeLimitExceeded` exception. -# NB: The per-task `soft_time_limit` must always be lower than `CELERYD_TASK_TIME_LIMIT`. -CELERYD_TASK_SOFT_TIME_LIMIT = 15 * 60 -CELERYD_TASK_TIME_LIMIT = CELERYD_TASK_SOFT_TIME_LIMIT + 30 +# NB: The per-task `soft_time_limit` must always be lower than `CELERY_TASK_TIME_LIMIT`. +CELERY_TASK_SOFT_TIME_LIMIT = 15 * 60 +CELERY_TASK_TIME_LIMIT = CELERY_TASK_SOFT_TIME_LIMIT + 30 -CELERYBEAT_SCHEDULE = { +CELERY_BEAT_SCHEDULE = { # this is just a failsafe in case the Pulse ingestion misses something 'fetch-push-logs-every-5-minutes': { 'task': 'fetch-push-logs', diff --git a/treeherder/etl/jobs.py b/treeherder/etl/jobs.py index 3cb0dc10a..6e6c12ba7 100644 --- a/treeherder/etl/jobs.py +++ b/treeherder/etl/jobs.py @@ -358,12 +358,17 @@ def _schedule_log_parsing(job, job_logs, result): job_log_ids.append(job_log.id) + # TODO: Replace the use of different queues for failures vs not with the + # RabbitMQ priority feature (since the idea behind separate queues was + # only to ensure failures are dealt with first if there is a backlog). if result != 'success': + queue = 'log_parser_fail' priority = 'failures' else: + queue = 'log_parser' priority = "normal" - parse_logs.apply_async(routing_key="log_parser.%s" % priority, + parse_logs.apply_async(queue=queue, args=[job.id, job_log_ids, priority]) diff --git a/treeherder/etl/management/commands/ingest_push.py b/treeherder/etl/management/commands/ingest_push.py index 23c5a9214..a34110076 100644 --- a/treeherder/etl/management/commands/ingest_push.py +++ b/treeherder/etl/management/commands/ingest_push.py @@ -56,7 +56,7 @@ class Command(BaseCommand): fetch_push_id = None # make sure all tasks are run synchronously / immediately - settings.CELERY_ALWAYS_EAGER = True + settings.CELERY_TASK_ALWAYS_EAGER = True # get hg pushlog pushlog_url = '%s/json-pushes/?full=1&version=2' % repo.url diff --git a/treeherder/etl/perf.py b/treeherder/etl/perf.py index fca4d1264..865c9a165 100644 --- a/treeherder/etl/perf.py +++ b/treeherder/etl/perf.py @@ -130,7 +130,7 @@ def _load_perf_datum(job, perf_datum): if signature.should_alert is not False and datum_created and \ job.repository.performance_alerts_enabled: generate_alerts.apply_async(args=[signature.id], - routing_key='generate_perf_alerts') + queue='generate_perf_alerts') for subtest in suite['subtests']: subtest_properties = { @@ -188,7 +188,7 @@ def _load_perf_datum(job, perf_datum): suite.get('value') is None)) and datum_created and job.repository.performance_alerts_enabled): generate_alerts.apply_async(args=[signature.id], - routing_key='generate_perf_alerts') + queue='generate_perf_alerts') def store_performance_artifact(job, artifact): diff --git a/treeherder/etl/tasks/pushlog_tasks.py b/treeherder/etl/tasks/pushlog_tasks.py index e97476042..4d347854c 100644 --- a/treeherder/etl/tasks/pushlog_tasks.py +++ b/treeherder/etl/tasks/pushlog_tasks.py @@ -14,7 +14,7 @@ def fetch_push_logs(): active_status="active"): fetch_hg_push_log.apply_async( args=(repo.name, repo.url), - routing_key='pushlog' + queue='pushlog' ) diff --git a/treeherder/log_parser/tasks.py b/treeherder/log_parser/tasks.py index da9e26e17..ded7d3923 100644 --- a/treeherder/log_parser/tasks.py +++ b/treeherder/log_parser/tasks.py @@ -79,9 +79,11 @@ def parse_logs(job_id, job_log_ids, priority): if success: logger.debug("Scheduling autoclassify for job %i", job_id) - autoclassify.apply_async( - args=[job_id], - routing_key="autoclassify.%s" % priority) + # TODO: Replace the use of different queues for failures vs not with the + # RabbitMQ priority feature (since the idea behind separate queues was + # only to ensure failures are dealt with first if there is a backlog). + queue = 'log_autoclassify_fail' if priority == 'failures' else 'log_autoclassify' + autoclassify.apply_async(args=[job_id], queue=queue) else: job.autoclassify_status = Job.SKIPPED else: diff --git a/treeherder/services/pulse/consumers.py b/treeherder/services/pulse/consumers.py index 7929f08b4..c8bc8153c 100644 --- a/treeherder/services/pulse/consumers.py +++ b/treeherder/services/pulse/consumers.py @@ -98,7 +98,7 @@ class JobConsumer(PulseConsumer): logger.info('received job message from %s#%s', exchange, routing_key) store_pulse_jobs.apply_async( args=[body, exchange, routing_key], - routing_key='store_pulse_jobs' + queue='store_pulse_jobs' ) message.ack() @@ -112,7 +112,7 @@ class PushConsumer(PulseConsumer): logger.info('received push message from %s#%s', exchange, routing_key) store_pulse_pushes.apply_async( args=[body, exchange, routing_key], - routing_key='store_pulse_pushes' + queue='store_pulse_pushes' ) message.ack()