This reverts commit 541688d5f2.

This caused some breakage on dev. We suspect it's because newrelic
hooks into the celery task launching code but does not support
celery 4.x yet.
This commit is contained in:
Mathieu Pillard 2017-10-13 00:04:27 +02:00
Родитель 2fb07a6274
Коммит 43d88b4f55
21 изменённых файлов: 98 добавлений и 159 удалений

Просмотреть файл

@ -28,7 +28,6 @@ services:
- mysql
- memcached
- elasticsearch
- redis
addons:
apt:

Просмотреть файл

@ -64,13 +64,7 @@ jobs:
set -x
sudo sysctl -w vm.max_map_count=262144
docker-compose up -d
sleep 5
docker-compose ps
# Make sure dependencies get updated in worker and web container
docker-compose exec worker make -f Makefile-docker update_deps
docker-compose exec worker supervisorctl restart all
docker-compose exec web make -f Makefile-docker update_deps
docker-compose exec web supervisorctl restart all
sleep 10
docker-compose exec web bash /code/scripts/ui-test.sh
- store_artifacts:
path: ui-test-results
@ -94,7 +88,6 @@ jobs:
key: uitest-cache-
paths:
- .tox
workflows:
version: 2
build_test_deploy_release:

Просмотреть файл

@ -8,7 +8,7 @@ services:
volumes:
- .:/code
environment:
- CELERY_BROKER_URL=amqp://olympia:olympia@rabbitmq/olympia
- BROKER_URL=amqp://olympia:olympia@rabbitmq/olympia
- CELERY_RESULT_BACKEND=redis://redis:6379/1
- DATABASE_URL=mysql://root:@mysqld/olympia
- ELASTICSEARCH_LOCATION=elasticsearch:9200

Просмотреть файл

@ -8,7 +8,7 @@ logfile=/code/logs/supervisord-celery.log
# Changes:
# - Set concurrency (number of workers) back to the default
# - The code auto-reloads for development
command=watchmedo auto-restart -d src/ -p '*.py' -- celery -A olympia.amo.celery:app worker -E --loglevel=INFO
command=celery -A olympia.amo.celery:app worker --autoreload -E --loglevel=INFO
directory=/code
stopasgroup=true
autostart=true

Просмотреть файл

@ -49,17 +49,3 @@ cookies==2.2.1 \
responses==0.8.1 \
--hash=sha256:98e1c0eb5a7a03d59e73c8ac774428664f319ef35c6ac59479436bbb9c3499be \
--hash=sha256:a64029dbc6bed7133e2c971ee52153f30e779434ad55a5abf40322bcff91d029
watchdog==0.8.3 \
--hash=sha256:7e65882adb7746039b6f3876ee174952f8eaaa34491ba34333ddf1fe35de4162
PyYAML==3.12 \
--hash=sha256:3262c96a1ca437e7e4763e2843746588a965426550f3797a79fca9c6199c431f \
--hash=sha256:16b20e970597e051997d90dc2cddc713a2876c47e3d92d59ee198700c5427736 \
--hash=sha256:592766c6303207a20efc445587778322d7f73b161bd994f227adaa341ba212ab \
--hash=sha256:ca233c64c6e40eaa6c66ef97058cdc80e8d0157a443655baa1b2966e812807ca \
--hash=sha256:5f84523c076ad14ff5e6c037fe1c89a7f73a3e04cf0377cb4d017014976433f3 \
--hash=sha256:5ac82e411044fb129bae5cfbeb3ba626acb2af31a8d17d175004b70862a741a7
argh==0.26.2 \
--hash=sha256:a9b3aaa1904eeb78e32394cd46c6f37ac0fb4af6dc488daa58971bdc7d7fcaf3 \
--hash=sha256:e9535b8c84dc9571a48999094fda7f33e63c3f1b74f3e5f3ac0105a58405bb65
pathtools==0.1.2 \
--hash=sha256:7c35c5421a39bb82e58018febd90e3b6e5db34c5443aaaf742b3f33d4655f1c0

Просмотреть файл

@ -72,9 +72,9 @@ amo-validator==1.10.73 \
--hash=sha256:49e01d13b2794bcf3e9ba0dfacd9ec648e163a4a43976b3688de70549f7d22d9 \
--hash=sha256:2d75369941a1bddc99e4a57c36c476741114e926397c36aac01351d36f1c8314
# amqp is required by kombu
amqp==2.2.2 \
--hash=sha256:4e28d3ea61a64ae61830000c909662cb053642efddbe96503db0e7783a6ee85b \
--hash=sha256:cba1ace9d4ff6049b190d8b7991f9c1006b443a5238021aca96dd6ad2ac9da22
amqp==1.4.9 \
--hash=sha256:e0ed0ce6b8ffe5690a2e856c7908dc557e0e605283d6885dd1361d79f2928908 \
--hash=sha256:2dea4d16d073c902c3b89d9b96620fb6729ac0f7a923bbc777cb4ad827c0c61a # pyup: >=1.4.9,<2.0
# anyjson is required by kombu
anyjson==0.3.3 \
--hash=sha256:37812d863c9ad3e35c0734c42e0bf0320ce8c3bed82cd20ad54cb34d158157ba
@ -91,14 +91,17 @@ asn1crypto==0.23.0 \
--hash=sha256:654b7db3b120e23474e9a1e5e38d268c77e58a9e17d2cb595456c37309846494 \
--hash=sha256:0874981329cfebb366d6584c3d16e913f2a0eb026c9463efcc4aaf42a9d94d70
# billiard is required by celery
billiard==3.5.0.3 \
--hash=sha256:1d7b22bdc47aa52841120fcd22a74ae4fc8c13e9d3935643098184f5788c3ce6
billiard==3.3.0.23 \
--hash=sha256:c0cbe8d45ba8d8213ad68ef9a1881002a151569c9424d551634195a18c3a4160 \
--hash=sha256:82041dbaa62f7fde1464d7ab449978618a38b241b40c0d31dafabb36446635dc \
--hash=sha256:958fc9f8fd5cc9b936b2cb9d96f02aa5ec3613ba13ee7f089c77ff0bcc368fac \
--hash=sha256:692a2a5a55ee39a42bcb7557930e2541da85df9ea81c6e24827f63b80cd39d0b # pyup: ==3.3.0.23
bleach==1.5.0 \
--hash=sha256:e67f46adcec78dbc3c04462f3aba3213a673d5652eba2609ed1ef15492a44b8d \
--hash=sha256:978e758599b54cd3caa2e160d74102879b230ea8dc93871d0783721eef58bc65 # pyup: ==1.5.0
celery==4.1.0 \
--hash=sha256:81a67f0d53a688ec2bc8557bd5d6d7218f925a6f2e6df80e01560de9e28997ec \
--hash=sha256:77ff3730198d6a17b3c1f05579ebe570b579efb35f6d7e13dba3b1368d068b35
celery==3.1.23 \
--hash=sha256:eaf5dee3becbc35c7754a2d4482d53bdf72ea3f85dd258525259983262081474 \
--hash=sha256:1a359c815837f9dbf193a7dbc6addafa34612c077ff70c66e3b16e14eebd2418 # pyup: ==3.1.23
certifi==2017.7.27.1 \
--hash=sha256:54a07c09c586b0e4c619f02a5e94e36619da8e2b053e20f594348c0611803704 \
--hash=sha256:40523d2efb60523e113b44602298f0960e900388cf3bb6043f645cf57ea9e3f5
@ -296,9 +299,9 @@ jingo_minify==0.7.0 \
--hash=sha256:1427946e8d4082c909a98dbacfe1cd908cbdc83572003e1a0236da02b350e31f \
--hash=sha256:f47ec7868467a1b270e115c301a4270137836ddb7b5f57552e65dff5c86aee05
# kombu is required by celery
kombu==4.1.0 \
--hash=sha256:01f0da9fe222a2183345004243d1518c0fbe5875955f1b24842f2d9c65709ade \
--hash=sha256:4249d9dd9dbf1fcec471d1c2def20653c9310dd1a217272d77e4844f9d5273cb
kombu==3.0.35 \
--hash=sha256:2c59a5e087d5895675cdb4d6a38a0aa147f0411366e68330a76e480ba3b25727 \
--hash=sha256:22ab336a17962717a5d9470547e5508d4bcf1b6ec10cd9486868daf4e5edb727 # pyup: ==3.0.35
# lxml is required by pyquery
lxml==4.0.0 \
--hash=sha256:3593f49858fc6229cd93326be06b099ae477fd65d8f4a981320a6d0bb7fc7a5a \

Просмотреть файл

@ -46,7 +46,7 @@ USE_MOZLOG = False
SESSION_COOKIE_SECURE = False
SESSION_COOKIE_DOMAIN = None
CELERY_TASK_ALWAYS_EAGER = False
CELERY_ALWAYS_EAGER = False
# If you want to allow self-reviews for add-ons/apps, then enable this.
# In production we do not want to allow this.

Просмотреть файл

@ -44,7 +44,7 @@ AUTHENTICATION_BACKENDS = (
'olympia.users.backends.TestUserBackend',
)
CELERY_TASK_ALWAYS_EAGER = True
CELERY_ALWAYS_EAGER = True
DEBUG = False
# We won't actually send an email.

Просмотреть файл

@ -8,7 +8,7 @@ from django.db.models import Q, F, Avg
from django.utils.encoding import force_text
import multidb
from celery import group
from celery.task.sets import TaskSet
import waffle
import olympia.core.logger
@ -44,7 +44,7 @@ def update_addon_average_daily_users():
ts = [_update_addon_average_daily_users.subtask(args=[chunk])
for chunk in chunked(d, 250)]
group(ts).apply_async()
TaskSet(ts).apply_async()
@task
@ -94,7 +94,7 @@ def update_addon_download_totals():
ts = [_update_addon_download_totals.subtask(args=[chunk])
for chunk in chunked(d, 250)]
group(ts).apply_async()
TaskSet(ts).apply_async()
@task
@ -175,7 +175,7 @@ def update_addon_appsupport():
task_log.info('Updating appsupport for %d new-ish addons.' % len(ids))
ts = [_update_appsupport.subtask(args=[chunk])
for chunk in chunked(ids, 20)]
group(ts).apply_async()
TaskSet(ts).apply_async()
def update_all_appsupport():
@ -273,7 +273,7 @@ def reindex_addons(index=None, addon_type=None):
ids = ids.filter(type=addon_type)
ts = [tasks.index_addons.subtask(args=[chunk], kwargs=dict(index=index))
for chunk in chunked(sorted(list(ids)), 150)]
group(ts).apply_async()
TaskSet(ts).apply_async()
def cleanup_image_files():

Просмотреть файл

@ -1,5 +1,3 @@
from contextlib import contextmanager
import mock
from django.conf import settings
@ -10,8 +8,7 @@ import pytest
from olympia import amo
from olympia.activity.models import AddonLog
from olympia.addons.management.commands import (
approve_addons, process_addons as pa)
from olympia.addons.management.commands import approve_addons
from olympia.addons.models import Addon
from olympia.amo.tests import (
addon_factory, AMOPaths, TestCase, version_factory)
@ -217,33 +214,9 @@ def test_process_addons_invalid_task():
call_command('process_addons', task='foo')
@contextmanager
def count_subtask_calls(original_function):
"""Mock a celery tasks subtask method and record it's calls.
You can't mock a celery task `.subtask` method if that task is used
inside a chord or group unfortunately because of some type checking
that is use inside Celery 4+.
So this wraps the original method and restores it and records the calls
on it's own.
"""
original_function_subtask = original_function.subtask
called = []
def _subtask_wrapper(*args, **kwargs):
called.append({'args': args, 'kwargs': kwargs})
return original_function_subtask(*args, **kwargs)
original_function.subtask = _subtask_wrapper
yield called
original_function.subtask = original_function_subtask
class AddFirefox57TagTestCase(TestCase):
def test_affects_only_public_webextensions(self):
@mock.patch('olympia.addons.tasks.add_firefox57_tag.subtask')
def test_affects_only_public_webextensions(self, add_firefox57_tag_mock):
addon_factory()
addon_factory(file_kw={'is_webextension': True,
'status': amo.STATUS_AWAITING_REVIEW},
@ -252,14 +225,13 @@ class AddFirefox57TagTestCase(TestCase):
public_mozilla_signed = addon_factory(file_kw={
'is_mozilla_signed_extension': True})
with count_subtask_calls(pa.add_firefox57_tag) as calls:
call_command(
'process_addons', task='add_firefox57_tag_to_webextensions')
call_command(
'process_addons', task='add_firefox57_tag_to_webextensions')
assert len(calls) == 1
assert calls[0]['kwargs']['args'] == [
[public_webextension.pk, public_mozilla_signed.pk]
]
assert add_firefox57_tag_mock.call_count == 1
add_firefox57_tag_mock.assert_called_with(
args=[[public_webextension.pk, public_mozilla_signed.pk]],
kwargs={})
def test_tag_added_for_is_webextension(self):
self.addon = addon_factory(file_kw={'is_webextension': True})
@ -286,7 +258,9 @@ class AddFirefox57TagTestCase(TestCase):
class RecalculateWeightTestCase(TestCase):
def test_only_affects_auto_approved(self):
@mock.patch('olympia.editors.tasks.recalculate_post_review_weight.subtask')
def test_only_affects_auto_approved(
self, recalculate_post_review_weight_mock):
# Non auto-approved add-on, should not be considered.
addon_factory()
@ -317,12 +291,12 @@ class RecalculateWeightTestCase(TestCase):
version_factory(
addon=auto_approved_addon, channel=amo.RELEASE_CHANNEL_UNLISTED)
with count_subtask_calls(pa.recalculate_post_review_weight) as calls:
call_command(
'process_addons', task='recalculate_post_review_weight')
call_command(
'process_addons', task='recalculate_post_review_weight')
assert len(calls) == 1
assert calls[0]['kwargs']['args'] == [[auto_approved_addon.pk]]
assert recalculate_post_review_weight_mock.call_count == 1
recalculate_post_review_weight_mock.assert_called_with(
args=[[auto_approved_addon.pk]], kwargs={})
def test_task_works_correctly(self):
addon = addon_factory(average_daily_users=100000)
@ -348,7 +322,9 @@ class BumpAppVerForLegacyAddonsTestCase(AMOPaths, TestCase):
self.firefox_for_android_56_star, _ = AppVersion.objects.get_or_create(
application=amo.ANDROID.id, version='56.*')
def test_only_affects_legacy_addons_targeting_firefox_lower_than_56(self):
@mock.patch('olympia.addons.tasks.bump_appver_for_legacy_addons.subtask')
def test_only_affects_legacy_addons_targeting_firefox_lower_than_56(
self, bump_appver_for_legacy_addons_mock):
# Should be included:
addon = addon_factory(version_kw={'max_app_version': '55.*'})
addon2 = addon_factory(version_kw={'application': amo.ANDROID.id})
@ -367,13 +343,11 @@ class BumpAppVerForLegacyAddonsTestCase(AMOPaths, TestCase):
ApplicationsVersions.objects.get_or_create(
application=amo.FIREFOX.id, version=weird_addon.current_version,
min=av_min, max=self.firefox_56_star)
with count_subtask_calls(pa.bump_appver_for_legacy_addons) as calls:
call_command(
'process_addons', task='bump_appver_for_legacy_addons')
assert len(calls) == 1
assert calls[0]['kwargs']['args'] == [[addon.pk, addon2.pk, addon3.pk]]
call_command('process_addons', task='bump_appver_for_legacy_addons')
assert bump_appver_for_legacy_addons_mock.call_count == 1
assert (
bump_appver_for_legacy_addons_mock.call_args[1]['args'] ==
[[addon.pk, addon2.pk, addon3.pk]])
@mock.patch('olympia.addons.tasks.index_addons.delay')
@mock.patch('olympia.addons.tasks.bump_appver_for_addon_if_necessary')

Просмотреть файл

@ -24,8 +24,8 @@ log = olympia.core.logger.getLogger('z.task')
app = Celery('olympia')
task = app.task
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(settings.INSTALLED_APPS)
# Hook up Sentry in celery.
raven_client = Client(settings.SENTRY_DSN)

Просмотреть файл

@ -6,7 +6,6 @@ from django.core import mail
from django.core.files.storage import default_storage as storage
from django.core.mail import EmailMessage
from django.utils import translation
from celery.exceptions import Retry
import mock
@ -281,12 +280,10 @@ class TestSendMail(BaseTestCase):
send_mail('test subject',
'test body',
recipient_list=['somebody@mozilla.org'])
with self.assertRaises(Retry):
send_mail('test subject',
'test body',
async=True,
recipient_list=['somebody@mozilla.org'])
send_mail('test subject',
'test body',
async=True,
recipient_list=['somebody@mozilla.org'])
@mock.patch('olympia.amo.tasks.EmailMessage')
def test_async_will_stop_retrying(self, backend):

Просмотреть файл

@ -3,7 +3,7 @@ from datetime import date
from django.db import connection
from django.db.models import Count
from celery import group
from celery.task.sets import TaskSet
import olympia.core.logger
from olympia import amo
@ -25,7 +25,7 @@ def update_collections_subscribers():
ts = [_update_collections_subscribers.subtask(args=[chunk])
for chunk in chunked(d, 1000)]
group(ts).apply_async()
TaskSet(ts).apply_async()
@task(rate_limit='15/m')
@ -66,11 +66,11 @@ def update_collections_votes():
ts = [_update_collections_votes.subtask(args=[chunk, 'new_votes_up'])
for chunk in chunked(up, 1000)]
group(ts).apply_async()
TaskSet(ts).apply_async()
ts = [_update_collections_votes.subtask(args=[chunk, 'new_votes_down'])
for chunk in chunked(down, 1000)]
group(ts).apply_async()
TaskSet(ts).apply_async()
@task(rate_limit='15/m')
@ -100,4 +100,4 @@ def reindex_collections(index=None):
taskset = [tasks.index_collections.subtask(args=[chunk],
kwargs=dict(index=index))
for chunk in chunked(sorted(list(ids)), 150)]
group(taskset).apply_async()
TaskSet(taskset).apply_async()

Просмотреть файл

@ -100,11 +100,11 @@ SECRET_KEY = env('SECRET_KEY')
LOG_LEVEL = logging.DEBUG
# Celery
CELERY_BROKER_URL = env('CELERY_BROKER_URL')
BROKER_URL = env('BROKER_URL')
CELERY_TASK_IGNORE_RESULT = True
CELERY_WORKER_DISABLE_RATE_LIMITS = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_IGNORE_RESULT = True
CELERY_DISABLE_RATE_LIMITS = True
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND')
NETAPP_STORAGE_ROOT = env('NETAPP_STORAGE_ROOT')

Просмотреть файл

@ -85,11 +85,11 @@ SECRET_KEY = env('SECRET_KEY')
# Celery
CELERY_BROKER_URL = env('CELERY_BROKER_URL')
BROKER_URL = env('BROKER_URL')
CELERY_TASK_IGNORE_RESULT = True
CELERY_WORKER_DISABLE_RATE_LIMITS = True
CELERY_BROKER_CONNECTION_TIMEOUT = 0.5
CELERY_IGNORE_RESULT = True
CELERY_DISABLE_RATE_LIMITS = True
BROKER_CONNECTION_TIMEOUT = 0.5
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND')
NETAPP_STORAGE_ROOT = env('NETAPP_STORAGE_ROOT')

Просмотреть файл

@ -97,11 +97,11 @@ SECRET_KEY = env('SECRET_KEY')
LOG_LEVEL = logging.DEBUG
# Celery
CELERY_BROKER_URL = env('CELERY_BROKER_URL')
BROKER_URL = env('BROKER_URL')
CELERY_TASK_IGNORE_RESULT = True
CELERY_WORKER_DISABLE_RATE_LIMITS = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_IGNORE_RESULT = True
CELERY_DISABLE_RATE_LIMITS = True
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND')
NETAPP_STORAGE_ROOT = env('NETAPP_STORAGE_ROOT')

Просмотреть файл

@ -1,6 +1,6 @@
from django.core.management.base import BaseCommand
from celery import group
from celery.task.sets import TaskSet
from olympia.amo.utils import chunked
from olympia.activity.models import ActivityLog
@ -16,4 +16,4 @@ class Command(BaseCommand):
ts = [add_versionlog.subtask(args=[chunk])
for chunk in chunked(pks, 100)]
group(ts).apply_async()
TaskSet(ts).apply_async()

Просмотреть файл

@ -1081,33 +1081,23 @@ VALIDATION_FAQ_URL = ('https://wiki.mozilla.org/Add-ons/Reviewers/Guide/'
# Celery
CELERY_BROKER_URL = os.environ.get(
'CELERY_BROKER_URL',
'amqp://olympia:olympia@localhost:5672/olympia')
CELERY_BROKER_CONNECTION_TIMEOUT = 0.1
CELERY_BROKER_HEARTBEAT = 60 * 15
CELERY_TASK_DEFAULT_QUEUE = 'default'
CELERY_RESULT_BACKEND = os.environ.get(
'CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')
BROKER_URL = os.environ.get('BROKER_URL',
'amqp://olympia:olympia@localhost:5672/olympia')
BROKER_CONNECTION_TIMEOUT = 0.1
BROKER_HEARTBEAT = 60 * 15
CELERY_DEFAULT_QUEUE = 'default'
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND',
'redis://localhost:6379/1')
CELERY_TASK_IGNORE_RESULT = True
CELERY_IGNORE_RESULT = True
CELERY_SEND_TASK_ERROR_EMAILS = True
CELERY_WORKER_HIJACK_ROOT_LOGGER = False
# Allow upgrading to Celery 4.x (JSON only)
# Explicitly force task serializer and result serializer to create tasks
# using the JSON format but still accept pickled messages.
# TODO: Remove `pickle` once the upgrade is done and seems stable.
CELERY_ACCEPT_CONTENT = ['pickle', 'json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERYD_HIJACK_ROOT_LOGGER = False
CELERY_IMPORTS = (
'olympia.lib.crypto.tasks',
'olympia.lib.es.management.commands.reindex',
)
CELERY_TASK_QUEUES = (
CELERY_QUEUES = (
Queue('default', routing_key='default'),
Queue('priority', routing_key='priority'),
Queue('devhub', routing_key='devhub'),
@ -1132,7 +1122,7 @@ CELERY_TASK_QUEUES = (
# Some notes:
# - always add routes here instead of @task(queue=<name>)
# - when adding a queue, be sure to update deploy.py so that it gets restarted
CELERY_TASK_ROUTES = {
CELERY_ROUTES = {
# Priority.
# If your tasks need to be run as soon as possible, add them here so they
# are routed to the priority queue.
@ -1288,12 +1278,12 @@ CELERY_TIME_LIMITS = {
}
# When testing, we always want tasks to raise exceptions. Good for sanity.
CELERY_TASK_EAGER_PROPAGATES = True
CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
# Time in seconds before celery.exceptions.SoftTimeLimitExceeded is raised.
# The task can catch that and recover but should exit ASAP. Note that there is
# a separate, shorter timeout for validation tasks.
CELERY_TASK_SOFT_TIME_LIMIT = 60 * 30
CELERYD_TASK_SOFT_TIME_LIMIT = 60 * 30
# Logging
LOG_LEVEL = logging.DEBUG
@ -1555,7 +1545,7 @@ LOGIN_RATELIMIT_ALL_USERS = '15/m'
CSRF_FAILURE_VIEW = 'olympia.amo.views.csrf_failure'
# Testing responsiveness without rate limits.
CELERY_WORKER_DISABLE_RATE_LIMITS = True
CELERY_DISABLE_RATE_LIMITS = True
# Default file storage mechanism that holds media.
DEFAULT_FILE_STORAGE = 'olympia.amo.utils.LocalFileStorage'

Просмотреть файл

@ -2,14 +2,12 @@ from django.conf import settings
def test_celery_routes_in_queues():
queues_in_queues = set([q.name for q in settings.CELERY_TASK_QUEUES])
queues_in_queues = set([q.name for q in settings.CELERY_QUEUES])
# check the default queue is defined in CELERY_QUEUES
assert settings.CELERY_TASK_DEFAULT_QUEUE in queues_in_queues
assert settings.CELERY_DEFAULT_QUEUE in queues_in_queues
# then remove it as it won't be in CELERY_ROUTES
queues_in_queues.remove(settings.CELERY_TASK_DEFAULT_QUEUE)
queues_in_queues.remove(settings.CELERY_DEFAULT_QUEUE)
queues_in_routes = set(
[c['queue'] for c in settings.CELERY_TASK_ROUTES.values()])
[c['queue'] for c in settings.CELERY_ROUTES.values()])
assert queues_in_queues == queues_in_routes

Просмотреть файл

@ -4,7 +4,7 @@ from django.core.management import call_command
from django.db.models import Sum, Max
import waffle
from celery import group
from celery.task.sets import TaskSet
import olympia.core.logger
from olympia.amo.utils import chunked
@ -26,7 +26,7 @@ def update_addons_collections_downloads():
ts = [tasks.update_addons_collections_downloads.subtask(args=[chunk])
for chunk in chunked(d, 100)]
group(ts).apply_async()
TaskSet(ts).apply_async()
def update_collections_total():
@ -37,7 +37,7 @@ def update_collections_total():
ts = [tasks.update_collections_total.subtask(args=[chunk])
for chunk in chunked(d, 50)]
group(ts).apply_async()
TaskSet(ts).apply_async()
def update_global_totals(date=None):
@ -57,7 +57,7 @@ def update_global_totals(date=None):
ts = [tasks.update_global_totals.subtask(kwargs=kw)
for kw in today_jobs + metrics_jobs]
group(ts).apply_async()
TaskSet(ts).apply_async()
def update_google_analytics(date=None):

Просмотреть файл

@ -1,7 +1,7 @@
from django.db import connections
import multidb
from celery import group
from celery.task.sets import TaskSet
import olympia.core.logger
@ -42,5 +42,4 @@ def update_user_ratings():
ts = [update_user_ratings_task.subtask(args=[chunk])
for chunk in chunked(d, 1000)]
group(ts).apply_async()
TaskSet(ts).apply_async()