Bug 1337717 - Update to newer Celery, Kombu, py-amqp and billiard (#4722)

Updating in one go, since the Celery 4 release only supports the newer
versions of its dependencies and vice versa.

Of note, this fixes the unhelpful connection error messages shown when
in fact there was an authentication problem, and brings Celery/Kombu
support for Python 3.7. It's also likely that this will fix the pulse
listener hang seen in bug 1529404.

The new Celery release has renamed a number of the settings.
Most changes were performed by running:

```
celery upgrade settings treeherder/config/settings.py --django
celery upgrade settings tests/settings.py --django
```

The Django integration in celery.py has been cleaned up by following:
https://celery.readthedocs.io/en/latest/django/first-steps-with-django.html

The bug being hit that caused this to be reverted back in #2119/bug 1333079
was due to Celery 4 no longer supporting calling `apply_async()`
with just the `routing_key` - it now has to be called with either just
the `queue`, or else both the `routing_key` and `queue`, otherwise the task
ends up in the `default` queue. Sadly this isn't mentioned in the Celery
breaking changes list - I'll file an upstream issue shortly.

Changes:

http://docs.celeryproject.org/en/master/history/whatsnew-4.0.html
http://docs.celeryproject.org/en/master/changelog.html#rc1
https://github.com/celery/celery/compare/v3.1.26...v4.3.0rc1

http://docs.celeryproject.org/projects/kombu/en/stable/changelog.html#version-4-3-0
https://github.com/celery/kombu/compare/v3.0.37...v4.3.0

https://amqp.readthedocs.io/en/stable/changelog.html
https://github.com/celery/py-amqp/compare/v1.4.9...v2.4.1

https://github.com/celery/billiard/blob/v3.6.0/CHANGES.txt
https://github.com/celery/billiard/compare/v3.3.0.23...v3.6.0
This commit is contained in:
Ed Morley 2019-02-28 19:52:22 +00:00 коммит произвёл GitHub
Родитель 03c94bf286
Коммит 0809a5bd57
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 70 добавлений и 70 удалений

Просмотреть файл

@ -1,26 +1,11 @@
# https://dependabot.com/blog/introducing-config-files/
version: 1
update_configs:
- package_manager: "python"
directory: "/requirements"
update_schedule: "live"
- package_manager: 'python'
directory: '/requirements'
update_schedule: 'live'
default_reviewers:
- "camd"
- 'camd'
default_labels:
- "dependencies"
- "python"
ignored_updates:
# Celery/kombu/amqp/billiard must be updated all at the same time,
# and the update is blocked on resolving bug 1337717.
- match:
dependency_name: "celery"
version_requirement: ">=4"
- match:
dependency_name: "kombu"
version_requirement: ">=4"
- match:
dependency_name: "amqp"
version_requirement: ">=2"
- match:
dependency_name: "billiard"
version_requirement: ">=3.4"
- 'dependencies'
- 'python'

Просмотреть файл

@ -20,7 +20,7 @@ settings_queues = set()
queues_list = None
for item in code.body:
if isinstance(item, ast.Assign) and item.targets[0].id == "CELERY_QUEUES":
if isinstance(item, ast.Assign) and item.targets[0].id == "CELERY_TASK_QUEUES":
queues_list = item.value
if queues_list is None:

Просмотреть файл

@ -42,11 +42,9 @@ Brotli==1.0.7 \
Django==2.1.7 \
--hash=sha256:275bec66fd2588dd517ada59b8bfb23d4a9abc5a362349139ddda3c7ff6f5ade
celery==3.1.26.post2 \
--hash=sha256:60211897aee321266ff043fe2b33eaac825dfe9f46843cf964fc97507a186334 \
--hash=sha256:5493e172ae817b81ba7d09443ada114886765a8ce02f16a56e6fac68d953a9b2
celery==4.3.0rc1 --hash=sha256:62cdf98af78278202b8a3ba1b8215c87ab420eae7eebf0b7ba27b16ebc9f148e
kombu==3.0.37 --hash=sha256:7ceab743e3e974f3e5736082e8cc514c009e254e646d6167342e0e192aee81a6
kombu==4.3.0 --hash=sha256:7a2cbed551103db9a4e2efafe9b63222e012a61a18a881160ad797b9d4e1d0a1
simplejson==3.16.0 \
--hash=sha256:6c3258ffff58712818a233b9737fe4be943d306c40cf63d14ddc82ba563f483a \
@ -79,14 +77,16 @@ mysqlclient==1.4.2.post1 \
--hash=sha256:f257d250f2675d0ef99bd318906f3cfc05cef4a2f385ea695ff32a3f04b9f9a7
# Required by celery
billiard==3.3.0.23 --hash=sha256:692a2a5a55ee39a42bcb7557930e2541da85df9ea81c6e24827f63b80cd39d0b
billiard==3.6.0.0 --hash=sha256:756bf323f250db8bf88462cd042c992ba60d8f5e07fc5636c24ba7d6f4261d84
pytz==2018.9 \
--hash=sha256:32b0891edff07e28efe91284ed9c31e123d84bea3fd98e1f72be2508f43ef8d9 \
--hash=sha256:d5f05e487007e29e03409f9398d074e158d920d36eb82eaf66fb1136b0c5374c
# Required by kombu
amqp==1.4.9 --hash=sha256:e0ed0ce6b8ffe5690a2e856c7908dc557e0e605283d6885dd1361d79f2928908
anyjson==0.3.3 --hash=sha256:37812d863c9ad3e35c0734c42e0bf0320ce8c3bed82cd20ad54cb34d158157ba
amqp==2.4.1 --hash=sha256:16056c952e8029ce8db097edf0d7c2fe2ba9de15d30ba08aee2c5221273d8e23
# Required by amqp
vine==1.2.0 --hash=sha256:3cd505dcf980223cfaf13423d371f2e7ff99247e38d5985a01ec8264e4f2aca1
# Required by mozlog
mozterm==1.0.0 \

Просмотреть файл

@ -6,7 +6,7 @@ set -euo pipefail
echo "Running pip check"
pip check
echo "Checking CELERY_QUEUES matches Procfile"
echo "Checking CELERY_TASK_QUEUES matches Procfile"
python -bb ./lints/queuelint.py
echo "Running flake8"

Просмотреть файл

@ -287,7 +287,7 @@ def pulse_connection():
This is a non-lazy mirror of our Pulse service's build_connection as
explained in: https://bugzilla.mozilla.org/show_bug.cgi?id=1484196
"""
return kombu.Connection(settings.BROKER_URL)
return kombu.Connection(settings.CELERY_BROKER_URL)
@pytest.fixture

Просмотреть файл

@ -6,8 +6,8 @@ KEY_PREFIX = 'test'
TREEHERDER_TEST_REPOSITORY_NAME = 'test_treeherder_jobs'
# this makes celery calls synchronous, useful for unit testing
CELERY_ALWAYS_EAGER = True
CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
CELERY_TASK_ALWAYS_EAGER = True
CELERY_TASK_EAGER_PROPAGATES = True
# Make WhiteNoise look for static assets inside registered Django apps, rather
# than only inside the generated staticfiles directory. This means we don't

Просмотреть файл

@ -1,3 +1,5 @@
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app # noqa
from .celery import app as celery_app
__all__ = ('celery_app',)

Просмотреть файл

@ -1,14 +1,17 @@
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'treeherder.config.settings')
app = Celery('treeherder')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

Просмотреть файл

@ -263,7 +263,13 @@ AUTH0_DOMAIN = env('AUTH0_DOMAIN', default="auth.mozilla.auth0.com")
AUTH0_CLIENTID = env('AUTH0_CLIENTID', default="q8fZZFfGEmSB2c5uSI8hOkKdDGXnlo5z")
# Celery
CELERY_QUEUES = [
# TODO: Replace the use of different log parser queues for failures vs not with the
# RabbitMQ priority feature (since the idea behind separate queues was only to ensure
# failures are dealt with first if there is a backlog). After that it should be possible
# to simplify the queue configuration, by using the recommended CELERY_TASK_ROUTES instead:
# http://docs.celeryproject.org/en/latest/userguide/routing.html#automatic-routing
CELERY_TASK_QUEUES = [
Queue('default', Exchange('default'), routing_key='default'),
Queue('log_parser', Exchange('default'), routing_key='log_parser.normal'),
Queue('log_parser_fail', Exchange('default'), routing_key='log_parser.failures'),
@ -276,39 +282,36 @@ CELERY_QUEUES = [
Queue('seta_analyze_failures', Exchange('default'), routing_key='seta_analyze_failures'),
]
# Force all queues to be explicitly listed in `CELERY_TASK_QUEUES` to help prevent typos
# and so that `lints/queuelint.py` can check a corresponding worker exists in `Procfile`.
CELERY_TASK_CREATE_MISSING_QUEUES = False
# Celery broker setup
BROKER_URL = env('BROKER_URL')
CELERY_BROKER_URL = env('BROKER_URL')
# Force Celery to use TLS when appropriate (ie if not localhost),
# rather than relying on `BROKER_URL` having `amqps://` or `?ssl=` set.
# rather than relying on `CELERY_BROKER_URL` having `amqps://` or `?ssl=` set.
# This is required since CloudAMQP's automatically defined URL uses neither.
if connection_should_use_tls(BROKER_URL):
BROKER_USE_SSL = True
if connection_should_use_tls(CELERY_BROKER_URL):
CELERY_BROKER_USE_SSL = True
# Recommended by CloudAMQP:
# https://www.cloudamqp.com/docs/celery.html
BROKER_CONNECTION_TIMEOUT = 30
BROKER_HEARTBEAT = None
CELERY_ACCEPT_CONTENT = ['json']
CELERY_EVENT_QUEUE_EXPIRES = 60
CELERY_IGNORE_RESULT = True
CELERY_RESULT_BACKEND = None
CELERY_RESULT_SERIALIZER = 'json'
CELERY_SEND_EVENTS = False
CELERY_TASK_SERIALIZER = 'json'
# Raise timeout from default of 4s, in case of Linux DNS timeouts etc.
CELERY_BROKER_CONNECTION_TIMEOUT = 30
# Disable heartbeats since CloudAMQP uses TCP keep-alive instead.
CELERY_BROKER_HEARTBEAT = None
# default value when no task routing info is specified
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_TASK_DEFAULT_QUEUE = 'default'
# Default celery time limits in seconds. The gap between the soft and hard time limit
# is to give the New Relic agent time to report the `SoftTimeLimitExceeded` exception.
# NB: The per-task `soft_time_limit` must always be lower than `CELERYD_TASK_TIME_LIMIT`.
CELERYD_TASK_SOFT_TIME_LIMIT = 15 * 60
CELERYD_TASK_TIME_LIMIT = CELERYD_TASK_SOFT_TIME_LIMIT + 30
# NB: The per-task `soft_time_limit` must always be lower than `CELERY_TASK_TIME_LIMIT`.
CELERY_TASK_SOFT_TIME_LIMIT = 15 * 60
CELERY_TASK_TIME_LIMIT = CELERY_TASK_SOFT_TIME_LIMIT + 30
CELERYBEAT_SCHEDULE = {
CELERY_BEAT_SCHEDULE = {
# this is just a failsafe in case the Pulse ingestion misses something
'fetch-push-logs-every-5-minutes': {
'task': 'fetch-push-logs',

Просмотреть файл

@ -358,12 +358,17 @@ def _schedule_log_parsing(job, job_logs, result):
job_log_ids.append(job_log.id)
# TODO: Replace the use of different queues for failures vs not with the
# RabbitMQ priority feature (since the idea behind separate queues was
# only to ensure failures are dealt with first if there is a backlog).
if result != 'success':
queue = 'log_parser_fail'
priority = 'failures'
else:
queue = 'log_parser'
priority = "normal"
parse_logs.apply_async(routing_key="log_parser.%s" % priority,
parse_logs.apply_async(queue=queue,
args=[job.id, job_log_ids, priority])

Просмотреть файл

@ -56,7 +56,7 @@ class Command(BaseCommand):
fetch_push_id = None
# make sure all tasks are run synchronously / immediately
settings.CELERY_ALWAYS_EAGER = True
settings.CELERY_TASK_ALWAYS_EAGER = True
# get hg pushlog
pushlog_url = '%s/json-pushes/?full=1&version=2' % repo.url

Просмотреть файл

@ -130,7 +130,7 @@ def _load_perf_datum(job, perf_datum):
if signature.should_alert is not False and datum_created and \
job.repository.performance_alerts_enabled:
generate_alerts.apply_async(args=[signature.id],
routing_key='generate_perf_alerts')
queue='generate_perf_alerts')
for subtest in suite['subtests']:
subtest_properties = {
@ -188,7 +188,7 @@ def _load_perf_datum(job, perf_datum):
suite.get('value') is None)) and
datum_created and job.repository.performance_alerts_enabled):
generate_alerts.apply_async(args=[signature.id],
routing_key='generate_perf_alerts')
queue='generate_perf_alerts')
def store_performance_artifact(job, artifact):

Просмотреть файл

@ -14,7 +14,7 @@ def fetch_push_logs():
active_status="active"):
fetch_hg_push_log.apply_async(
args=(repo.name, repo.url),
routing_key='pushlog'
queue='pushlog'
)

Просмотреть файл

@ -79,9 +79,11 @@ def parse_logs(job_id, job_log_ids, priority):
if success:
logger.debug("Scheduling autoclassify for job %i", job_id)
autoclassify.apply_async(
args=[job_id],
routing_key="autoclassify.%s" % priority)
# TODO: Replace the use of different queues for failures vs not with the
# RabbitMQ priority feature (since the idea behind separate queues was
# only to ensure failures are dealt with first if there is a backlog).
queue = 'log_autoclassify_fail' if priority == 'failures' else 'log_autoclassify'
autoclassify.apply_async(args=[job_id], queue=queue)
else:
job.autoclassify_status = Job.SKIPPED
else:

Просмотреть файл

@ -98,7 +98,7 @@ class JobConsumer(PulseConsumer):
logger.info('received job message from %s#%s', exchange, routing_key)
store_pulse_jobs.apply_async(
args=[body, exchange, routing_key],
routing_key='store_pulse_jobs'
queue='store_pulse_jobs'
)
message.ack()
@ -112,7 +112,7 @@ class PushConsumer(PulseConsumer):
logger.info('received push message from %s#%s', exchange, routing_key)
store_pulse_pushes.apply_async(
args=[body, exchange, routing_key],
routing_key='store_pulse_pushes'
queue='store_pulse_pushes'
)
message.ack()