* Update celery, config and dependencies to 4.x

* Remove usage of TaskSet, replace with group(), update `ampq`
* Enable redis on travis for better celery 4.x testing
* Fix lib/celery_tests
* Correct test testing email sending retrying on errors
* Fix celery subtask mocking errors.

* Fix celery broker url env variable

* Integrate watchdog for celery autoreload

* Ensure dependencies are updated for worker and web container.

* Restart all services after the dependency update

* Remove explicit celery loader environment variable.

* Restart all services in web and worker after running 'update_docker'

* Increase sleep time a bit
This commit is contained in:
Christopher Grebs 2017-10-16 11:16:41 +02:00 коммит произвёл GitHub
Родитель c03f066966
Коммит 176383b168
23 изменённых файлов: 174 добавлений и 101 удалений

Просмотреть файл

@ -28,6 +28,7 @@ services:
- mysql
- memcached
- elasticsearch
- redis
addons:
apt:

Просмотреть файл

@ -40,6 +40,8 @@ test_failed:
update_docker:
docker-compose exec worker make update_deps
docker-compose exec web make update
docker-compose exec web supervisorctl restart all
docker-compose exec worker supervisorctl restart all
initialize_docker:
docker-compose exec web make initialize

Просмотреть файл

@ -64,7 +64,13 @@ jobs:
set -x
sudo sysctl -w vm.max_map_count=262144
docker-compose up -d
sleep 10
sleep 20
docker-compose ps
# Make sure dependencies get updated in worker and web container
docker-compose exec worker make -f Makefile-docker update_deps
docker-compose exec worker supervisorctl restart all
docker-compose exec web make -f Makefile-docker update_deps
docker-compose exec web supervisorctl restart all
docker-compose exec web bash /code/scripts/ui-test.sh
- store_artifacts:
path: ui-test-results
@ -88,6 +94,7 @@ jobs:
key: uitest-cache-
paths:
- .tox
workflows:
version: 2
build_test_deploy_release:

Просмотреть файл

@ -8,7 +8,7 @@ services:
volumes:
- .:/code
environment:
- BROKER_URL=amqp://olympia:olympia@rabbitmq/olympia
- CELERY_BROKER_URL=amqp://olympia:olympia@rabbitmq/olympia
- CELERY_RESULT_BACKEND=redis://redis:6379/1
- DATABASE_URL=mysql://root:@mysqld/olympia
- ELASTICSEARCH_LOCATION=elasticsearch:9200

Просмотреть файл

@ -8,7 +8,7 @@ logfile=/code/logs/supervisord-celery.log
# Changes:
# - Set concurrency (number of workers) back to the default
# - The code auto-reloads for development
command=celery -A olympia.amo.celery:app worker --autoreload -E --loglevel=INFO
command=watchmedo auto-restart -d src/ -p '*.py' -- celery -A olympia.amo.celery:app worker -E --loglevel=INFO
directory=/code
stopasgroup=true
autostart=true

Просмотреть файл

@ -63,3 +63,17 @@ meld3==1.0.2 \
--hash=sha256:b28a9bfac342aadb4557aa144bea9f8e6208bfb0596190570d10a892d35ff7dc
supervisor==3.3.3 \
--hash=sha256:96287ebfabf9a6923f74123b056c4da39c617fef367980f007cac02fba6527ad
watchdog==0.8.3 \
--hash=sha256:7e65882adb7746039b6f3876ee174952f8eaaa34491ba34333ddf1fe35de4162
PyYAML==3.12 \
--hash=sha256:3262c96a1ca437e7e4763e2843746588a965426550f3797a79fca9c6199c431f \
--hash=sha256:16b20e970597e051997d90dc2cddc713a2876c47e3d92d59ee198700c5427736 \
--hash=sha256:592766c6303207a20efc445587778322d7f73b161bd994f227adaa341ba212ab \
--hash=sha256:ca233c64c6e40eaa6c66ef97058cdc80e8d0157a443655baa1b2966e812807ca \
--hash=sha256:5f84523c076ad14ff5e6c037fe1c89a7f73a3e04cf0377cb4d017014976433f3 \
--hash=sha256:5ac82e411044fb129bae5cfbeb3ba626acb2af31a8d17d175004b70862a741a7
argh==0.26.2 \
--hash=sha256:a9b3aaa1904eeb78e32394cd46c6f37ac0fb4af6dc488daa58971bdc7d7fcaf3 \
--hash=sha256:e9535b8c84dc9571a48999094fda7f33e63c3f1b74f3e5f3ac0105a58405bb65
pathtools==0.1.2 \
--hash=sha256:7c35c5421a39bb82e58018febd90e3b6e5db34c5443aaaf742b3f33d4655f1c0

Просмотреть файл

@ -72,9 +72,9 @@ amo-validator==1.10.73 \
--hash=sha256:49e01d13b2794bcf3e9ba0dfacd9ec648e163a4a43976b3688de70549f7d22d9 \
--hash=sha256:2d75369941a1bddc99e4a57c36c476741114e926397c36aac01351d36f1c8314
# amqp is required by kombu
amqp==1.4.9 \
--hash=sha256:e0ed0ce6b8ffe5690a2e856c7908dc557e0e605283d6885dd1361d79f2928908 \
--hash=sha256:2dea4d16d073c902c3b89d9b96620fb6729ac0f7a923bbc777cb4ad827c0c61a # pyup: >=1.4.9,<2.0
amqp==2.2.2 \
--hash=sha256:4e28d3ea61a64ae61830000c909662cb053642efddbe96503db0e7783a6ee85b \
--hash=sha256:cba1ace9d4ff6049b190d8b7991f9c1006b443a5238021aca96dd6ad2ac9da22
# anyjson is required by kombu
anyjson==0.3.3 \
--hash=sha256:37812d863c9ad3e35c0734c42e0bf0320ce8c3bed82cd20ad54cb34d158157ba
@ -83,17 +83,27 @@ argparse==1.4.0 \
--hash=sha256:c31647edb69fd3d465a847ea3157d37bed1f95f19760b11a47aa91c04b666314 \
--hash=sha256:62b089a55be1d8949cd2bc7e0df0bddb9e028faefc8c32038cc84862aefdd6e4
# billiard is required by celery
billiard==3.3.0.23 \
--hash=sha256:c0cbe8d45ba8d8213ad68ef9a1881002a151569c9424d551634195a18c3a4160 \
--hash=sha256:82041dbaa62f7fde1464d7ab449978618a38b241b40c0d31dafabb36446635dc \
--hash=sha256:958fc9f8fd5cc9b936b2cb9d96f02aa5ec3613ba13ee7f089c77ff0bcc368fac \
--hash=sha256:692a2a5a55ee39a42bcb7557930e2541da85df9ea81c6e24827f63b80cd39d0b # pyup: ==3.3.0.23
billiard==3.5.0.3 \
--hash=sha256:1d7b22bdc47aa52841120fcd22a74ae4fc8c13e9d3935643098184f5788c3ce6
bleach==1.5.0 \
--hash=sha256:e67f46adcec78dbc3c04462f3aba3213a673d5652eba2609ed1ef15492a44b8d \
--hash=sha256:978e758599b54cd3caa2e160d74102879b230ea8dc93871d0783721eef58bc65 # pyup: ==1.5.0
celery==3.1.23 \
--hash=sha256:eaf5dee3becbc35c7754a2d4482d53bdf72ea3f85dd258525259983262081474 \
--hash=sha256:1a359c815837f9dbf193a7dbc6addafa34612c077ff70c66e3b16e14eebd2418 # pyup: ==3.1.23
celery==4.1.0 \
--hash=sha256:81a67f0d53a688ec2bc8557bd5d6d7218f925a6f2e6df80e01560de9e28997ec \
--hash=sha256:77ff3730198d6a17b3c1f05579ebe570b579efb35f6d7e13dba3b1368d068b35
certifi==2017.7.27.1 \
--hash=sha256:54a07c09c586b0e4c619f02a5e94e36619da8e2b053e20f594348c0611803704 \
--hash=sha256:40523d2efb60523e113b44602298f0960e900388cf3bb6043f645cf57ea9e3f5
# cffi is required by cryptography
cffi==1.11.1 \
--hash=sha256:5af456d82aba74acfe63ec5c472a330ddaff8633bcc928c965236cee8db9e8a3 \
--hash=sha256:60443dede4a6027828fd1a29f024f090b02eb1ebd8b3d1d0b5f5195c27d83a2a \
--hash=sha256:5b5161ac1f475ec677ce5761253ca168567d5e6a38089f51131862dba720f6b8 \
--hash=sha256:c9faf0ef88c556e41bba2d26e5d2ef60854a2a08c2c86c0f05e3ccb6b6e87494 \
--hash=sha256:39ed8eff749296ede7fca5b5006b8a529f24e304668584e8cf0966b260d3ffd4 \
--hash=sha256:689546ed8386d4642a744001fa88fd1acffe52cf9843d87e21050e8553300164 \
--hash=sha256:3504eb0b3fa0688e0b9374b975862d511f159d478d06378fed86de86d1217bfb \
--hash=sha256:4c40817cc0f71b5351eb0bdd0b585db4a285c2bcc03fbcb961b79bb8086b7576
# chardet is required by requests
chardet==3.0.4 \
--hash=sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691 \
@ -214,9 +224,9 @@ jingo_minify==0.7.0 \
--hash=sha256:1427946e8d4082c909a98dbacfe1cd908cbdc83572003e1a0236da02b350e31f \
--hash=sha256:f47ec7868467a1b270e115c301a4270137836ddb7b5f57552e65dff5c86aee05
# kombu is required by celery
kombu==3.0.35 \
--hash=sha256:2c59a5e087d5895675cdb4d6a38a0aa147f0411366e68330a76e480ba3b25727 \
--hash=sha256:22ab336a17962717a5d9470547e5508d4bcf1b6ec10cd9486868daf4e5edb727 # pyup: ==3.0.35
kombu==4.1.0 \
--hash=sha256:01f0da9fe222a2183345004243d1518c0fbe5875955f1b24842f2d9c65709ade \
--hash=sha256:4249d9dd9dbf1fcec471d1c2def20653c9310dd1a217272d77e4844f9d5273cb
# lxml is required by pyquery
lxml==4.1.0 \
--hash=sha256:7a8715539adb41c78129983ba69d852e0102a3f51d559eeb91dce1f6290c4ad0 \

Просмотреть файл

@ -46,7 +46,7 @@ USE_MOZLOG = False
SESSION_COOKIE_SECURE = False
SESSION_COOKIE_DOMAIN = None
CELERY_ALWAYS_EAGER = False
CELERY_TASK_ALWAYS_EAGER = False
# If you want to allow self-reviews for add-ons/apps, then enable this.
# In production we do not want to allow this.

Просмотреть файл

@ -44,7 +44,7 @@ AUTHENTICATION_BACKENDS = (
'olympia.users.backends.TestUserBackend',
)
CELERY_ALWAYS_EAGER = True
CELERY_TASK_ALWAYS_EAGER = True
DEBUG = False
# We won't actually send an email.

Просмотреть файл

@ -8,7 +8,7 @@ from django.db.models import Q, F, Avg
from django.utils.encoding import force_text
import multidb
from celery.task.sets import TaskSet
from celery import group
import waffle
import olympia.core.logger
@ -44,7 +44,7 @@ def update_addon_average_daily_users():
ts = [_update_addon_average_daily_users.subtask(args=[chunk])
for chunk in chunked(d, 250)]
TaskSet(ts).apply_async()
group(ts).apply_async()
@task
@ -94,7 +94,7 @@ def update_addon_download_totals():
ts = [_update_addon_download_totals.subtask(args=[chunk])
for chunk in chunked(d, 250)]
TaskSet(ts).apply_async()
group(ts).apply_async()
@task
@ -175,7 +175,7 @@ def update_addon_appsupport():
task_log.info('Updating appsupport for %d new-ish addons.' % len(ids))
ts = [_update_appsupport.subtask(args=[chunk])
for chunk in chunked(ids, 20)]
TaskSet(ts).apply_async()
group(ts).apply_async()
def update_all_appsupport():
@ -273,7 +273,7 @@ def reindex_addons(index=None, addon_type=None):
ids = ids.filter(type=addon_type)
ts = [tasks.index_addons.subtask(args=[chunk], kwargs=dict(index=index))
for chunk in chunked(sorted(list(ids)), 150)]
TaskSet(ts).apply_async()
group(ts).apply_async()
def cleanup_image_files():

Просмотреть файл

@ -1,3 +1,5 @@
from contextlib import contextmanager
import mock
from django.conf import settings
@ -8,7 +10,8 @@ import pytest
from olympia import amo
from olympia.activity.models import AddonLog
from olympia.addons.management.commands import approve_addons
from olympia.addons.management.commands import (
approve_addons, process_addons as pa)
from olympia.addons.models import Addon
from olympia.amo.tests import (
addon_factory, AMOPaths, TestCase, version_factory)
@ -214,9 +217,33 @@ def test_process_addons_invalid_task():
call_command('process_addons', task='foo')
@contextmanager
def count_subtask_calls(original_function):
"""Mock a celery tasks subtask method and record it's calls.
You can't mock a celery task `.subtask` method if that task is used
inside a chord or group unfortunately because of some type checking
that is use inside Celery 4+.
So this wraps the original method and restores it and records the calls
on it's own.
"""
original_function_subtask = original_function.subtask
called = []
def _subtask_wrapper(*args, **kwargs):
called.append({'args': args, 'kwargs': kwargs})
return original_function_subtask(*args, **kwargs)
original_function.subtask = _subtask_wrapper
yield called
original_function.subtask = original_function_subtask
class AddFirefox57TagTestCase(TestCase):
@mock.patch('olympia.addons.tasks.add_firefox57_tag.subtask')
def test_affects_only_public_webextensions(self, add_firefox57_tag_mock):
def test_affects_only_public_webextensions(self):
addon_factory()
addon_factory(file_kw={'is_webextension': True,
'status': amo.STATUS_AWAITING_REVIEW},
@ -225,13 +252,14 @@ class AddFirefox57TagTestCase(TestCase):
public_mozilla_signed = addon_factory(file_kw={
'is_mozilla_signed_extension': True})
call_command(
'process_addons', task='add_firefox57_tag_to_webextensions')
with count_subtask_calls(pa.add_firefox57_tag) as calls:
call_command(
'process_addons', task='add_firefox57_tag_to_webextensions')
assert add_firefox57_tag_mock.call_count == 1
add_firefox57_tag_mock.assert_called_with(
args=[[public_webextension.pk, public_mozilla_signed.pk]],
kwargs={})
assert len(calls) == 1
assert calls[0]['kwargs']['args'] == [
[public_webextension.pk, public_mozilla_signed.pk]
]
def test_tag_added_for_is_webextension(self):
self.addon = addon_factory(file_kw={'is_webextension': True})
@ -258,9 +286,7 @@ class AddFirefox57TagTestCase(TestCase):
class RecalculateWeightTestCase(TestCase):
@mock.patch('olympia.editors.tasks.recalculate_post_review_weight.subtask')
def test_only_affects_auto_approved(
self, recalculate_post_review_weight_mock):
def test_only_affects_auto_approved(self):
# Non auto-approved add-on, should not be considered.
addon_factory()
@ -291,12 +317,12 @@ class RecalculateWeightTestCase(TestCase):
version_factory(
addon=auto_approved_addon, channel=amo.RELEASE_CHANNEL_UNLISTED)
call_command(
'process_addons', task='recalculate_post_review_weight')
with count_subtask_calls(pa.recalculate_post_review_weight) as calls:
call_command(
'process_addons', task='recalculate_post_review_weight')
assert recalculate_post_review_weight_mock.call_count == 1
recalculate_post_review_weight_mock.assert_called_with(
args=[[auto_approved_addon.pk]], kwargs={})
assert len(calls) == 1
assert calls[0]['kwargs']['args'] == [[auto_approved_addon.pk]]
def test_task_works_correctly(self):
addon = addon_factory(average_daily_users=100000)
@ -322,9 +348,7 @@ class BumpAppVerForLegacyAddonsTestCase(AMOPaths, TestCase):
self.firefox_for_android_56_star, _ = AppVersion.objects.get_or_create(
application=amo.ANDROID.id, version='56.*')
@mock.patch('olympia.addons.tasks.bump_appver_for_legacy_addons.subtask')
def test_only_affects_legacy_addons_targeting_firefox_lower_than_56(
self, bump_appver_for_legacy_addons_mock):
def test_only_affects_legacy_addons_targeting_firefox_lower_than_56(self):
# Should be included:
addon = addon_factory(version_kw={'max_app_version': '55.*'})
addon2 = addon_factory(version_kw={'application': amo.ANDROID.id})
@ -343,11 +367,13 @@ class BumpAppVerForLegacyAddonsTestCase(AMOPaths, TestCase):
ApplicationsVersions.objects.get_or_create(
application=amo.FIREFOX.id, version=weird_addon.current_version,
min=av_min, max=self.firefox_56_star)
call_command('process_addons', task='bump_appver_for_legacy_addons')
assert bump_appver_for_legacy_addons_mock.call_count == 1
assert (
bump_appver_for_legacy_addons_mock.call_args[1]['args'] ==
[[addon.pk, addon2.pk, addon3.pk]])
with count_subtask_calls(pa.bump_appver_for_legacy_addons) as calls:
call_command(
'process_addons', task='bump_appver_for_legacy_addons')
assert len(calls) == 1
assert calls[0]['kwargs']['args'] == [[addon.pk, addon2.pk, addon3.pk]]
@mock.patch('olympia.addons.tasks.index_addons.delay')
@mock.patch('olympia.addons.tasks.bump_appver_for_addon_if_necessary')

Просмотреть файл

@ -24,8 +24,8 @@ log = olympia.core.logger.getLogger('z.task')
app = Celery('olympia')
task = app.task
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(settings.INSTALLED_APPS)
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# Hook up Sentry in celery.
raven_client = Client(settings.SENTRY_DSN)

Просмотреть файл

@ -6,6 +6,7 @@ from django.core import mail
from django.core.files.storage import default_storage as storage
from django.core.mail import EmailMessage
from django.utils import translation
from celery.exceptions import Retry
import mock
@ -280,10 +281,12 @@ class TestSendMail(BaseTestCase):
send_mail('test subject',
'test body',
recipient_list=['somebody@mozilla.org'])
send_mail('test subject',
'test body',
async=True,
recipient_list=['somebody@mozilla.org'])
with self.assertRaises(Retry):
send_mail('test subject',
'test body',
async=True,
recipient_list=['somebody@mozilla.org'])
@mock.patch('olympia.amo.tasks.EmailMessage')
def test_async_will_stop_retrying(self, backend):

Просмотреть файл

@ -3,7 +3,7 @@ from datetime import date
from django.db import connection
from django.db.models import Count
from celery.task.sets import TaskSet
from celery import group
import olympia.core.logger
from olympia import amo
@ -25,7 +25,7 @@ def update_collections_subscribers():
ts = [_update_collections_subscribers.subtask(args=[chunk])
for chunk in chunked(d, 1000)]
TaskSet(ts).apply_async()
group(ts).apply_async()
@task(rate_limit='15/m')
@ -66,11 +66,11 @@ def update_collections_votes():
ts = [_update_collections_votes.subtask(args=[chunk, 'new_votes_up'])
for chunk in chunked(up, 1000)]
TaskSet(ts).apply_async()
group(ts).apply_async()
ts = [_update_collections_votes.subtask(args=[chunk, 'new_votes_down'])
for chunk in chunked(down, 1000)]
TaskSet(ts).apply_async()
group(ts).apply_async()
@task(rate_limit='15/m')
@ -100,4 +100,4 @@ def reindex_collections(index=None):
taskset = [tasks.index_collections.subtask(args=[chunk],
kwargs=dict(index=index))
for chunk in chunked(sorted(list(ids)), 150)]
TaskSet(taskset).apply_async()
group(taskset).apply_async()

Просмотреть файл

@ -100,11 +100,11 @@ SECRET_KEY = env('SECRET_KEY')
LOG_LEVEL = logging.DEBUG
# Celery
BROKER_URL = env('BROKER_URL')
CELERY_BROKER_URL = env('CELERY_BROKER_URL')
CELERY_IGNORE_RESULT = True
CELERY_DISABLE_RATE_LIMITS = True
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_TASK_IGNORE_RESULT = True
CELERY_WORKER_DISABLE_RATE_LIMITS = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND')
NETAPP_STORAGE_ROOT = env('NETAPP_STORAGE_ROOT')

Просмотреть файл

@ -85,11 +85,11 @@ SECRET_KEY = env('SECRET_KEY')
# Celery
BROKER_URL = env('BROKER_URL')
CELERY_BROKER_URL = env('CELERY_BROKER_URL')
CELERY_IGNORE_RESULT = True
CELERY_DISABLE_RATE_LIMITS = True
BROKER_CONNECTION_TIMEOUT = 0.5
CELERY_TASK_IGNORE_RESULT = True
CELERY_WORKER_DISABLE_RATE_LIMITS = True
CELERY_BROKER_CONNECTION_TIMEOUT = 0.5
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND')
NETAPP_STORAGE_ROOT = env('NETAPP_STORAGE_ROOT')

Просмотреть файл

@ -97,11 +97,11 @@ SECRET_KEY = env('SECRET_KEY')
LOG_LEVEL = logging.DEBUG
# Celery
BROKER_URL = env('BROKER_URL')
CELERY_BROKER_URL = env('CELERY_BROKER_URL')
CELERY_IGNORE_RESULT = True
CELERY_DISABLE_RATE_LIMITS = True
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_TASK_IGNORE_RESULT = True
CELERY_WORKER_DISABLE_RATE_LIMITS = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND')
NETAPP_STORAGE_ROOT = env('NETAPP_STORAGE_ROOT')

Просмотреть файл

@ -1,6 +1,6 @@
from django.core.management.base import BaseCommand
from celery.task.sets import TaskSet
from celery import group
from olympia.amo.utils import chunked
from olympia.activity.models import ActivityLog
@ -16,4 +16,4 @@ class Command(BaseCommand):
ts = [add_versionlog.subtask(args=[chunk])
for chunk in chunked(pks, 100)]
TaskSet(ts).apply_async()
group(ts).apply_async()

Просмотреть файл

@ -1081,23 +1081,33 @@ VALIDATION_FAQ_URL = ('https://wiki.mozilla.org/Add-ons/Reviewers/Guide/'
# Celery
BROKER_URL = os.environ.get('BROKER_URL',
'amqp://olympia:olympia@localhost:5672/olympia')
BROKER_CONNECTION_TIMEOUT = 0.1
BROKER_HEARTBEAT = 60 * 15
CELERY_DEFAULT_QUEUE = 'default'
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND',
'redis://localhost:6379/1')
CELERY_BROKER_URL = os.environ.get(
'CELERY_BROKER_URL',
'amqp://olympia:olympia@localhost:5672/olympia')
CELERY_BROKER_CONNECTION_TIMEOUT = 0.1
CELERY_BROKER_HEARTBEAT = 60 * 15
CELERY_TASK_DEFAULT_QUEUE = 'default'
CELERY_RESULT_BACKEND = os.environ.get(
'CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')
CELERY_IGNORE_RESULT = True
CELERY_TASK_IGNORE_RESULT = True
CELERY_SEND_TASK_ERROR_EMAILS = True
CELERYD_HIJACK_ROOT_LOGGER = False
CELERY_WORKER_HIJACK_ROOT_LOGGER = False
# Allow upgrading to Celery 4.x (JSON only)
# Explicitly force task serializer and result serializer to create tasks
# using the JSON format but still accept pickled messages.
# TODO: Remove `pickle` once the upgrade is done and seems stable.
CELERY_ACCEPT_CONTENT = ['pickle', 'json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_IMPORTS = (
'olympia.lib.crypto.tasks',
'olympia.lib.es.management.commands.reindex',
)
CELERY_QUEUES = (
CELERY_TASK_QUEUES = (
Queue('default', routing_key='default'),
Queue('priority', routing_key='priority'),
Queue('devhub', routing_key='devhub'),
@ -1122,7 +1132,7 @@ CELERY_QUEUES = (
# Some notes:
# - always add routes here instead of @task(queue=<name>)
# - when adding a queue, be sure to update deploy.py so that it gets restarted
CELERY_ROUTES = {
CELERY_TASK_ROUTES = {
# Priority.
# If your tasks need to be run as soon as possible, add them here so they
# are routed to the priority queue.
@ -1278,12 +1288,12 @@ CELERY_TIME_LIMITS = {
}
# When testing, we always want tasks to raise exceptions. Good for sanity.
CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
CELERY_TASK_EAGER_PROPAGATES = True
# Time in seconds before celery.exceptions.SoftTimeLimitExceeded is raised.
# The task can catch that and recover but should exit ASAP. Note that there is
# a separate, shorter timeout for validation tasks.
CELERYD_TASK_SOFT_TIME_LIMIT = 60 * 30
CELERY_TASK_SOFT_TIME_LIMIT = 60 * 30
# Logging
LOG_LEVEL = logging.DEBUG
@ -1545,7 +1555,7 @@ LOGIN_RATELIMIT_ALL_USERS = '15/m'
CSRF_FAILURE_VIEW = 'olympia.amo.views.csrf_failure'
# Testing responsiveness without rate limits.
CELERY_DISABLE_RATE_LIMITS = True
CELERY_WORKER_DISABLE_RATE_LIMITS = True
# Default file storage mechanism that holds media.
DEFAULT_FILE_STORAGE = 'olympia.amo.utils.LocalFileStorage'

Просмотреть файл

@ -2,12 +2,14 @@ from django.conf import settings
def test_celery_routes_in_queues():
queues_in_queues = set([q.name for q in settings.CELERY_QUEUES])
queues_in_queues = set([q.name for q in settings.CELERY_TASK_QUEUES])
# check the default queue is defined in CELERY_QUEUES
assert settings.CELERY_DEFAULT_QUEUE in queues_in_queues
assert settings.CELERY_TASK_DEFAULT_QUEUE in queues_in_queues
# then remove it as it won't be in CELERY_ROUTES
queues_in_queues.remove(settings.CELERY_DEFAULT_QUEUE)
queues_in_queues.remove(settings.CELERY_TASK_DEFAULT_QUEUE)
queues_in_routes = set(
[c['queue'] for c in settings.CELERY_ROUTES.values()])
[c['queue'] for c in settings.CELERY_TASK_ROUTES.values()])
assert queues_in_queues == queues_in_routes

Просмотреть файл

@ -4,7 +4,7 @@ from django.core.management import call_command
from django.db.models import Sum, Max
import waffle
from celery.task.sets import TaskSet
from celery import group
import olympia.core.logger
from olympia.amo.utils import chunked
@ -26,7 +26,7 @@ def update_addons_collections_downloads():
ts = [tasks.update_addons_collections_downloads.subtask(args=[chunk])
for chunk in chunked(d, 100)]
TaskSet(ts).apply_async()
group(ts).apply_async()
def update_collections_total():
@ -37,7 +37,7 @@ def update_collections_total():
ts = [tasks.update_collections_total.subtask(args=[chunk])
for chunk in chunked(d, 50)]
TaskSet(ts).apply_async()
group(ts).apply_async()
def update_global_totals(date=None):
@ -57,7 +57,7 @@ def update_global_totals(date=None):
ts = [tasks.update_global_totals.subtask(kwargs=kw)
for kw in today_jobs + metrics_jobs]
TaskSet(ts).apply_async()
group(ts).apply_async()
def update_google_analytics(date=None):

Просмотреть файл

@ -1,7 +1,7 @@
from django.db import connections
import multidb
from celery.task.sets import TaskSet
from celery import group
import olympia.core.logger
@ -42,4 +42,5 @@ def update_user_ratings():
ts = [update_user_ratings_task.subtask(args=[chunk])
for chunk in chunked(d, 1000)]
TaskSet(ts).apply_async()
group(ts).apply_async()

Просмотреть файл

@ -8,9 +8,6 @@ log = logging.getLogger('z.startup')
# Remember when mod_wsgi loaded this file so we can track it in nagios.
wsgi_loaded = datetime.now()
# Tell celery that we're using Django.
os.environ['CELERY_LOADER'] = 'django'
import django
import django.conf
from django.core.wsgi import get_wsgi_application