From 43d88b4f55bbbd5f138201fad72d26ec2132eb95 Mon Sep 17 00:00:00 2001 From: Mathieu Pillard Date: Fri, 13 Oct 2017 00:04:27 +0200 Subject: [PATCH] Revert "Upgrade to Celery 4 (#6625)" This reverts commit 541688d5f2e7b53d091ad5f0775c710c59c0abc6. This caused some breakage on dev. We suspect it's because newrelic hooks into the celery task launching code but does not support celery 4.x yet. --- .travis.yml | 1 - circle.yml | 9 +-- docker-compose.yml | 2 +- docker/supervisor-celery.conf | 2 +- requirements/dev.txt | 14 ---- requirements/prod.txt | 25 +++--- settings.py | 2 +- settings_test.py | 2 +- src/olympia/addons/cron.py | 10 +-- src/olympia/addons/tests/test_commands.py | 76 ++++++------------- src/olympia/amo/celery.py | 4 +- src/olympia/amo/tests/test_send_mail.py | 11 +-- src/olympia/bandwagon/cron.py | 10 +-- src/olympia/conf/dev/settings.py | 8 +- src/olympia/conf/prod/settings.py | 8 +- src/olympia/conf/stage/settings.py | 8 +- .../editors/management/commands/versionlog.py | 4 +- src/olympia/lib/settings_base.py | 38 ++++------ src/olympia/lib/tests/test_celery.py | 10 +-- src/olympia/stats/cron.py | 8 +- src/olympia/users/cron.py | 5 +- 21 files changed, 98 insertions(+), 159 deletions(-) diff --git a/.travis.yml b/.travis.yml index 294da5f301..750fdda0dc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,7 +28,6 @@ services: - mysql - memcached - elasticsearch - - redis addons: apt: diff --git a/circle.yml b/circle.yml index 2ac7cc678f..2151d3f026 100644 --- a/circle.yml +++ b/circle.yml @@ -64,13 +64,7 @@ jobs: set -x sudo sysctl -w vm.max_map_count=262144 docker-compose up -d - sleep 5 - docker-compose ps - # Make sure dependencies get updated in worker and web container - docker-compose exec worker make -f Makefile-docker update_deps - docker-compose exec worker supervisorctl restart all - docker-compose exec web make -f Makefile-docker update_deps - docker-compose exec web supervisorctl restart all + sleep 10 docker-compose exec web bash /code/scripts/ui-test.sh - store_artifacts: path: ui-test-results @@ -94,7 +88,6 @@ jobs: key: uitest-cache- paths: - .tox - workflows: version: 2 build_test_deploy_release: diff --git a/docker-compose.yml b/docker-compose.yml index 05d9a76653..de1619488a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,7 +8,7 @@ services: volumes: - .:/code environment: - - CELERY_BROKER_URL=amqp://olympia:olympia@rabbitmq/olympia + - BROKER_URL=amqp://olympia:olympia@rabbitmq/olympia - CELERY_RESULT_BACKEND=redis://redis:6379/1 - DATABASE_URL=mysql://root:@mysqld/olympia - ELASTICSEARCH_LOCATION=elasticsearch:9200 diff --git a/docker/supervisor-celery.conf b/docker/supervisor-celery.conf index fb8adfaef8..41a63a7b33 100644 --- a/docker/supervisor-celery.conf +++ b/docker/supervisor-celery.conf @@ -8,7 +8,7 @@ logfile=/code/logs/supervisord-celery.log # Changes: # - Set concurrency (number of workers) back to the default # - The code auto-reloads for development -command=watchmedo auto-restart -d src/ -p '*.py' -- celery -A olympia.amo.celery:app worker -E --loglevel=INFO +command=celery -A olympia.amo.celery:app worker --autoreload -E --loglevel=INFO directory=/code stopasgroup=true autostart=true diff --git a/requirements/dev.txt b/requirements/dev.txt index 91b8b35e3e..e470c8f3e5 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -49,17 +49,3 @@ cookies==2.2.1 \ responses==0.8.1 \ --hash=sha256:98e1c0eb5a7a03d59e73c8ac774428664f319ef35c6ac59479436bbb9c3499be \ --hash=sha256:a64029dbc6bed7133e2c971ee52153f30e779434ad55a5abf40322bcff91d029 -watchdog==0.8.3 \ - --hash=sha256:7e65882adb7746039b6f3876ee174952f8eaaa34491ba34333ddf1fe35de4162 -PyYAML==3.12 \ - --hash=sha256:3262c96a1ca437e7e4763e2843746588a965426550f3797a79fca9c6199c431f \ - --hash=sha256:16b20e970597e051997d90dc2cddc713a2876c47e3d92d59ee198700c5427736 \ - --hash=sha256:592766c6303207a20efc445587778322d7f73b161bd994f227adaa341ba212ab \ - --hash=sha256:ca233c64c6e40eaa6c66ef97058cdc80e8d0157a443655baa1b2966e812807ca \ - --hash=sha256:5f84523c076ad14ff5e6c037fe1c89a7f73a3e04cf0377cb4d017014976433f3 \ - --hash=sha256:5ac82e411044fb129bae5cfbeb3ba626acb2af31a8d17d175004b70862a741a7 -argh==0.26.2 \ - --hash=sha256:a9b3aaa1904eeb78e32394cd46c6f37ac0fb4af6dc488daa58971bdc7d7fcaf3 \ - --hash=sha256:e9535b8c84dc9571a48999094fda7f33e63c3f1b74f3e5f3ac0105a58405bb65 -pathtools==0.1.2 \ - --hash=sha256:7c35c5421a39bb82e58018febd90e3b6e5db34c5443aaaf742b3f33d4655f1c0 diff --git a/requirements/prod.txt b/requirements/prod.txt index bcb4c74930..7abf4ac5dd 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -72,9 +72,9 @@ amo-validator==1.10.73 \ --hash=sha256:49e01d13b2794bcf3e9ba0dfacd9ec648e163a4a43976b3688de70549f7d22d9 \ --hash=sha256:2d75369941a1bddc99e4a57c36c476741114e926397c36aac01351d36f1c8314 # amqp is required by kombu -amqp==2.2.2 \ - --hash=sha256:4e28d3ea61a64ae61830000c909662cb053642efddbe96503db0e7783a6ee85b \ - --hash=sha256:cba1ace9d4ff6049b190d8b7991f9c1006b443a5238021aca96dd6ad2ac9da22 +amqp==1.4.9 \ + --hash=sha256:e0ed0ce6b8ffe5690a2e856c7908dc557e0e605283d6885dd1361d79f2928908 \ + --hash=sha256:2dea4d16d073c902c3b89d9b96620fb6729ac0f7a923bbc777cb4ad827c0c61a # pyup: >=1.4.9,<2.0 # anyjson is required by kombu anyjson==0.3.3 \ --hash=sha256:37812d863c9ad3e35c0734c42e0bf0320ce8c3bed82cd20ad54cb34d158157ba @@ -91,14 +91,17 @@ asn1crypto==0.23.0 \ --hash=sha256:654b7db3b120e23474e9a1e5e38d268c77e58a9e17d2cb595456c37309846494 \ --hash=sha256:0874981329cfebb366d6584c3d16e913f2a0eb026c9463efcc4aaf42a9d94d70 # billiard is required by celery -billiard==3.5.0.3 \ - --hash=sha256:1d7b22bdc47aa52841120fcd22a74ae4fc8c13e9d3935643098184f5788c3ce6 +billiard==3.3.0.23 \ + --hash=sha256:c0cbe8d45ba8d8213ad68ef9a1881002a151569c9424d551634195a18c3a4160 \ + --hash=sha256:82041dbaa62f7fde1464d7ab449978618a38b241b40c0d31dafabb36446635dc \ + --hash=sha256:958fc9f8fd5cc9b936b2cb9d96f02aa5ec3613ba13ee7f089c77ff0bcc368fac \ + --hash=sha256:692a2a5a55ee39a42bcb7557930e2541da85df9ea81c6e24827f63b80cd39d0b # pyup: ==3.3.0.23 bleach==1.5.0 \ --hash=sha256:e67f46adcec78dbc3c04462f3aba3213a673d5652eba2609ed1ef15492a44b8d \ --hash=sha256:978e758599b54cd3caa2e160d74102879b230ea8dc93871d0783721eef58bc65 # pyup: ==1.5.0 -celery==4.1.0 \ - --hash=sha256:81a67f0d53a688ec2bc8557bd5d6d7218f925a6f2e6df80e01560de9e28997ec \ - --hash=sha256:77ff3730198d6a17b3c1f05579ebe570b579efb35f6d7e13dba3b1368d068b35 +celery==3.1.23 \ + --hash=sha256:eaf5dee3becbc35c7754a2d4482d53bdf72ea3f85dd258525259983262081474 \ + --hash=sha256:1a359c815837f9dbf193a7dbc6addafa34612c077ff70c66e3b16e14eebd2418 # pyup: ==3.1.23 certifi==2017.7.27.1 \ --hash=sha256:54a07c09c586b0e4c619f02a5e94e36619da8e2b053e20f594348c0611803704 \ --hash=sha256:40523d2efb60523e113b44602298f0960e900388cf3bb6043f645cf57ea9e3f5 @@ -296,9 +299,9 @@ jingo_minify==0.7.0 \ --hash=sha256:1427946e8d4082c909a98dbacfe1cd908cbdc83572003e1a0236da02b350e31f \ --hash=sha256:f47ec7868467a1b270e115c301a4270137836ddb7b5f57552e65dff5c86aee05 # kombu is required by celery -kombu==4.1.0 \ - --hash=sha256:01f0da9fe222a2183345004243d1518c0fbe5875955f1b24842f2d9c65709ade \ - --hash=sha256:4249d9dd9dbf1fcec471d1c2def20653c9310dd1a217272d77e4844f9d5273cb +kombu==3.0.35 \ + --hash=sha256:2c59a5e087d5895675cdb4d6a38a0aa147f0411366e68330a76e480ba3b25727 \ + --hash=sha256:22ab336a17962717a5d9470547e5508d4bcf1b6ec10cd9486868daf4e5edb727 # pyup: ==3.0.35 # lxml is required by pyquery lxml==4.0.0 \ --hash=sha256:3593f49858fc6229cd93326be06b099ae477fd65d8f4a981320a6d0bb7fc7a5a \ diff --git a/settings.py b/settings.py index 59e61c28b2..38de2a499c 100644 --- a/settings.py +++ b/settings.py @@ -46,7 +46,7 @@ USE_MOZLOG = False SESSION_COOKIE_SECURE = False SESSION_COOKIE_DOMAIN = None -CELERY_TASK_ALWAYS_EAGER = False +CELERY_ALWAYS_EAGER = False # If you want to allow self-reviews for add-ons/apps, then enable this. # In production we do not want to allow this. diff --git a/settings_test.py b/settings_test.py index baa4aa77d1..5ed79af4b2 100644 --- a/settings_test.py +++ b/settings_test.py @@ -44,7 +44,7 @@ AUTHENTICATION_BACKENDS = ( 'olympia.users.backends.TestUserBackend', ) -CELERY_TASK_ALWAYS_EAGER = True +CELERY_ALWAYS_EAGER = True DEBUG = False # We won't actually send an email. diff --git a/src/olympia/addons/cron.py b/src/olympia/addons/cron.py index bb5d73e027..04c1e896d9 100644 --- a/src/olympia/addons/cron.py +++ b/src/olympia/addons/cron.py @@ -8,7 +8,7 @@ from django.db.models import Q, F, Avg from django.utils.encoding import force_text import multidb -from celery import group +from celery.task.sets import TaskSet import waffle import olympia.core.logger @@ -44,7 +44,7 @@ def update_addon_average_daily_users(): ts = [_update_addon_average_daily_users.subtask(args=[chunk]) for chunk in chunked(d, 250)] - group(ts).apply_async() + TaskSet(ts).apply_async() @task @@ -94,7 +94,7 @@ def update_addon_download_totals(): ts = [_update_addon_download_totals.subtask(args=[chunk]) for chunk in chunked(d, 250)] - group(ts).apply_async() + TaskSet(ts).apply_async() @task @@ -175,7 +175,7 @@ def update_addon_appsupport(): task_log.info('Updating appsupport for %d new-ish addons.' % len(ids)) ts = [_update_appsupport.subtask(args=[chunk]) for chunk in chunked(ids, 20)] - group(ts).apply_async() + TaskSet(ts).apply_async() def update_all_appsupport(): @@ -273,7 +273,7 @@ def reindex_addons(index=None, addon_type=None): ids = ids.filter(type=addon_type) ts = [tasks.index_addons.subtask(args=[chunk], kwargs=dict(index=index)) for chunk in chunked(sorted(list(ids)), 150)] - group(ts).apply_async() + TaskSet(ts).apply_async() def cleanup_image_files(): diff --git a/src/olympia/addons/tests/test_commands.py b/src/olympia/addons/tests/test_commands.py index dc9a667d95..e9192937af 100644 --- a/src/olympia/addons/tests/test_commands.py +++ b/src/olympia/addons/tests/test_commands.py @@ -1,5 +1,3 @@ -from contextlib import contextmanager - import mock from django.conf import settings @@ -10,8 +8,7 @@ import pytest from olympia import amo from olympia.activity.models import AddonLog -from olympia.addons.management.commands import ( - approve_addons, process_addons as pa) +from olympia.addons.management.commands import approve_addons from olympia.addons.models import Addon from olympia.amo.tests import ( addon_factory, AMOPaths, TestCase, version_factory) @@ -217,33 +214,9 @@ def test_process_addons_invalid_task(): call_command('process_addons', task='foo') -@contextmanager -def count_subtask_calls(original_function): - """Mock a celery tasks subtask method and record it's calls. - - You can't mock a celery task `.subtask` method if that task is used - inside a chord or group unfortunately because of some type checking - that is use inside Celery 4+. - - So this wraps the original method and restores it and records the calls - on it's own. - """ - original_function_subtask = original_function.subtask - called = [] - - def _subtask_wrapper(*args, **kwargs): - called.append({'args': args, 'kwargs': kwargs}) - return original_function_subtask(*args, **kwargs) - - original_function.subtask = _subtask_wrapper - - yield called - - original_function.subtask = original_function_subtask - - class AddFirefox57TagTestCase(TestCase): - def test_affects_only_public_webextensions(self): + @mock.patch('olympia.addons.tasks.add_firefox57_tag.subtask') + def test_affects_only_public_webextensions(self, add_firefox57_tag_mock): addon_factory() addon_factory(file_kw={'is_webextension': True, 'status': amo.STATUS_AWAITING_REVIEW}, @@ -252,14 +225,13 @@ class AddFirefox57TagTestCase(TestCase): public_mozilla_signed = addon_factory(file_kw={ 'is_mozilla_signed_extension': True}) - with count_subtask_calls(pa.add_firefox57_tag) as calls: - call_command( - 'process_addons', task='add_firefox57_tag_to_webextensions') + call_command( + 'process_addons', task='add_firefox57_tag_to_webextensions') - assert len(calls) == 1 - assert calls[0]['kwargs']['args'] == [ - [public_webextension.pk, public_mozilla_signed.pk] - ] + assert add_firefox57_tag_mock.call_count == 1 + add_firefox57_tag_mock.assert_called_with( + args=[[public_webextension.pk, public_mozilla_signed.pk]], + kwargs={}) def test_tag_added_for_is_webextension(self): self.addon = addon_factory(file_kw={'is_webextension': True}) @@ -286,7 +258,9 @@ class AddFirefox57TagTestCase(TestCase): class RecalculateWeightTestCase(TestCase): - def test_only_affects_auto_approved(self): + @mock.patch('olympia.editors.tasks.recalculate_post_review_weight.subtask') + def test_only_affects_auto_approved( + self, recalculate_post_review_weight_mock): # Non auto-approved add-on, should not be considered. addon_factory() @@ -317,12 +291,12 @@ class RecalculateWeightTestCase(TestCase): version_factory( addon=auto_approved_addon, channel=amo.RELEASE_CHANNEL_UNLISTED) - with count_subtask_calls(pa.recalculate_post_review_weight) as calls: - call_command( - 'process_addons', task='recalculate_post_review_weight') + call_command( + 'process_addons', task='recalculate_post_review_weight') - assert len(calls) == 1 - assert calls[0]['kwargs']['args'] == [[auto_approved_addon.pk]] + assert recalculate_post_review_weight_mock.call_count == 1 + recalculate_post_review_weight_mock.assert_called_with( + args=[[auto_approved_addon.pk]], kwargs={}) def test_task_works_correctly(self): addon = addon_factory(average_daily_users=100000) @@ -348,7 +322,9 @@ class BumpAppVerForLegacyAddonsTestCase(AMOPaths, TestCase): self.firefox_for_android_56_star, _ = AppVersion.objects.get_or_create( application=amo.ANDROID.id, version='56.*') - def test_only_affects_legacy_addons_targeting_firefox_lower_than_56(self): + @mock.patch('olympia.addons.tasks.bump_appver_for_legacy_addons.subtask') + def test_only_affects_legacy_addons_targeting_firefox_lower_than_56( + self, bump_appver_for_legacy_addons_mock): # Should be included: addon = addon_factory(version_kw={'max_app_version': '55.*'}) addon2 = addon_factory(version_kw={'application': amo.ANDROID.id}) @@ -367,13 +343,11 @@ class BumpAppVerForLegacyAddonsTestCase(AMOPaths, TestCase): ApplicationsVersions.objects.get_or_create( application=amo.FIREFOX.id, version=weird_addon.current_version, min=av_min, max=self.firefox_56_star) - - with count_subtask_calls(pa.bump_appver_for_legacy_addons) as calls: - call_command( - 'process_addons', task='bump_appver_for_legacy_addons') - - assert len(calls) == 1 - assert calls[0]['kwargs']['args'] == [[addon.pk, addon2.pk, addon3.pk]] + call_command('process_addons', task='bump_appver_for_legacy_addons') + assert bump_appver_for_legacy_addons_mock.call_count == 1 + assert ( + bump_appver_for_legacy_addons_mock.call_args[1]['args'] == + [[addon.pk, addon2.pk, addon3.pk]]) @mock.patch('olympia.addons.tasks.index_addons.delay') @mock.patch('olympia.addons.tasks.bump_appver_for_addon_if_necessary') diff --git a/src/olympia/amo/celery.py b/src/olympia/amo/celery.py index 91b15cbfca..af36b182a5 100644 --- a/src/olympia/amo/celery.py +++ b/src/olympia/amo/celery.py @@ -24,8 +24,8 @@ log = olympia.core.logger.getLogger('z.task') app = Celery('olympia') task = app.task -app.config_from_object('django.conf:settings', namespace='CELERY') -app.autodiscover_tasks() +app.config_from_object('django.conf:settings') +app.autodiscover_tasks(settings.INSTALLED_APPS) # Hook up Sentry in celery. raven_client = Client(settings.SENTRY_DSN) diff --git a/src/olympia/amo/tests/test_send_mail.py b/src/olympia/amo/tests/test_send_mail.py index 825cc4c64b..330730feee 100644 --- a/src/olympia/amo/tests/test_send_mail.py +++ b/src/olympia/amo/tests/test_send_mail.py @@ -6,7 +6,6 @@ from django.core import mail from django.core.files.storage import default_storage as storage from django.core.mail import EmailMessage from django.utils import translation -from celery.exceptions import Retry import mock @@ -281,12 +280,10 @@ class TestSendMail(BaseTestCase): send_mail('test subject', 'test body', recipient_list=['somebody@mozilla.org']) - - with self.assertRaises(Retry): - send_mail('test subject', - 'test body', - async=True, - recipient_list=['somebody@mozilla.org']) + send_mail('test subject', + 'test body', + async=True, + recipient_list=['somebody@mozilla.org']) @mock.patch('olympia.amo.tasks.EmailMessage') def test_async_will_stop_retrying(self, backend): diff --git a/src/olympia/bandwagon/cron.py b/src/olympia/bandwagon/cron.py index d90c27cc2b..b06d2c984f 100644 --- a/src/olympia/bandwagon/cron.py +++ b/src/olympia/bandwagon/cron.py @@ -3,7 +3,7 @@ from datetime import date from django.db import connection from django.db.models import Count -from celery import group +from celery.task.sets import TaskSet import olympia.core.logger from olympia import amo @@ -25,7 +25,7 @@ def update_collections_subscribers(): ts = [_update_collections_subscribers.subtask(args=[chunk]) for chunk in chunked(d, 1000)] - group(ts).apply_async() + TaskSet(ts).apply_async() @task(rate_limit='15/m') @@ -66,11 +66,11 @@ def update_collections_votes(): ts = [_update_collections_votes.subtask(args=[chunk, 'new_votes_up']) for chunk in chunked(up, 1000)] - group(ts).apply_async() + TaskSet(ts).apply_async() ts = [_update_collections_votes.subtask(args=[chunk, 'new_votes_down']) for chunk in chunked(down, 1000)] - group(ts).apply_async() + TaskSet(ts).apply_async() @task(rate_limit='15/m') @@ -100,4 +100,4 @@ def reindex_collections(index=None): taskset = [tasks.index_collections.subtask(args=[chunk], kwargs=dict(index=index)) for chunk in chunked(sorted(list(ids)), 150)] - group(taskset).apply_async() + TaskSet(taskset).apply_async() diff --git a/src/olympia/conf/dev/settings.py b/src/olympia/conf/dev/settings.py index 7df05e8dac..d04d6d723e 100644 --- a/src/olympia/conf/dev/settings.py +++ b/src/olympia/conf/dev/settings.py @@ -100,11 +100,11 @@ SECRET_KEY = env('SECRET_KEY') LOG_LEVEL = logging.DEBUG # Celery -CELERY_BROKER_URL = env('CELERY_BROKER_URL') +BROKER_URL = env('BROKER_URL') -CELERY_TASK_IGNORE_RESULT = True -CELERY_WORKER_DISABLE_RATE_LIMITS = True -CELERY_WORKER_PREFETCH_MULTIPLIER = 1 +CELERY_IGNORE_RESULT = True +CELERY_DISABLE_RATE_LIMITS = True +CELERYD_PREFETCH_MULTIPLIER = 1 CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND') NETAPP_STORAGE_ROOT = env('NETAPP_STORAGE_ROOT') diff --git a/src/olympia/conf/prod/settings.py b/src/olympia/conf/prod/settings.py index 3bfbbcf632..8dfccb3f3c 100644 --- a/src/olympia/conf/prod/settings.py +++ b/src/olympia/conf/prod/settings.py @@ -85,11 +85,11 @@ SECRET_KEY = env('SECRET_KEY') # Celery -CELERY_BROKER_URL = env('CELERY_BROKER_URL') +BROKER_URL = env('BROKER_URL') -CELERY_TASK_IGNORE_RESULT = True -CELERY_WORKER_DISABLE_RATE_LIMITS = True -CELERY_BROKER_CONNECTION_TIMEOUT = 0.5 +CELERY_IGNORE_RESULT = True +CELERY_DISABLE_RATE_LIMITS = True +BROKER_CONNECTION_TIMEOUT = 0.5 CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND') NETAPP_STORAGE_ROOT = env('NETAPP_STORAGE_ROOT') diff --git a/src/olympia/conf/stage/settings.py b/src/olympia/conf/stage/settings.py index e73208f552..a170e17fbf 100644 --- a/src/olympia/conf/stage/settings.py +++ b/src/olympia/conf/stage/settings.py @@ -97,11 +97,11 @@ SECRET_KEY = env('SECRET_KEY') LOG_LEVEL = logging.DEBUG # Celery -CELERY_BROKER_URL = env('CELERY_BROKER_URL') +BROKER_URL = env('BROKER_URL') -CELERY_TASK_IGNORE_RESULT = True -CELERY_WORKER_DISABLE_RATE_LIMITS = True -CELERY_WORKER_PREFETCH_MULTIPLIER = 1 +CELERY_IGNORE_RESULT = True +CELERY_DISABLE_RATE_LIMITS = True +CELERYD_PREFETCH_MULTIPLIER = 1 CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND') NETAPP_STORAGE_ROOT = env('NETAPP_STORAGE_ROOT') diff --git a/src/olympia/editors/management/commands/versionlog.py b/src/olympia/editors/management/commands/versionlog.py index 596f1effc7..7d44ff48c9 100644 --- a/src/olympia/editors/management/commands/versionlog.py +++ b/src/olympia/editors/management/commands/versionlog.py @@ -1,6 +1,6 @@ from django.core.management.base import BaseCommand -from celery import group +from celery.task.sets import TaskSet from olympia.amo.utils import chunked from olympia.activity.models import ActivityLog @@ -16,4 +16,4 @@ class Command(BaseCommand): ts = [add_versionlog.subtask(args=[chunk]) for chunk in chunked(pks, 100)] - group(ts).apply_async() + TaskSet(ts).apply_async() diff --git a/src/olympia/lib/settings_base.py b/src/olympia/lib/settings_base.py index 35f2f0ef6f..e02bf4ab0d 100644 --- a/src/olympia/lib/settings_base.py +++ b/src/olympia/lib/settings_base.py @@ -1081,33 +1081,23 @@ VALIDATION_FAQ_URL = ('https://wiki.mozilla.org/Add-ons/Reviewers/Guide/' # Celery -CELERY_BROKER_URL = os.environ.get( - 'CELERY_BROKER_URL', - 'amqp://olympia:olympia@localhost:5672/olympia') -CELERY_BROKER_CONNECTION_TIMEOUT = 0.1 -CELERY_BROKER_HEARTBEAT = 60 * 15 -CELERY_TASK_DEFAULT_QUEUE = 'default' -CELERY_RESULT_BACKEND = os.environ.get( - 'CELERY_RESULT_BACKEND', 'redis://localhost:6379/1') +BROKER_URL = os.environ.get('BROKER_URL', + 'amqp://olympia:olympia@localhost:5672/olympia') +BROKER_CONNECTION_TIMEOUT = 0.1 +BROKER_HEARTBEAT = 60 * 15 +CELERY_DEFAULT_QUEUE = 'default' +CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', + 'redis://localhost:6379/1') -CELERY_TASK_IGNORE_RESULT = True +CELERY_IGNORE_RESULT = True CELERY_SEND_TASK_ERROR_EMAILS = True -CELERY_WORKER_HIJACK_ROOT_LOGGER = False - -# Allow upgrading to Celery 4.x (JSON only) -# Explicitly force task serializer and result serializer to create tasks -# using the JSON format but still accept pickled messages. -# TODO: Remove `pickle` once the upgrade is done and seems stable. -CELERY_ACCEPT_CONTENT = ['pickle', 'json'] -CELERY_TASK_SERIALIZER = 'json' -CELERY_RESULT_SERIALIZER = 'json' - +CELERYD_HIJACK_ROOT_LOGGER = False CELERY_IMPORTS = ( 'olympia.lib.crypto.tasks', 'olympia.lib.es.management.commands.reindex', ) -CELERY_TASK_QUEUES = ( +CELERY_QUEUES = ( Queue('default', routing_key='default'), Queue('priority', routing_key='priority'), Queue('devhub', routing_key='devhub'), @@ -1132,7 +1122,7 @@ CELERY_TASK_QUEUES = ( # Some notes: # - always add routes here instead of @task(queue=) # - when adding a queue, be sure to update deploy.py so that it gets restarted -CELERY_TASK_ROUTES = { +CELERY_ROUTES = { # Priority. # If your tasks need to be run as soon as possible, add them here so they # are routed to the priority queue. @@ -1288,12 +1278,12 @@ CELERY_TIME_LIMITS = { } # When testing, we always want tasks to raise exceptions. Good for sanity. -CELERY_TASK_EAGER_PROPAGATES = True +CELERY_EAGER_PROPAGATES_EXCEPTIONS = True # Time in seconds before celery.exceptions.SoftTimeLimitExceeded is raised. # The task can catch that and recover but should exit ASAP. Note that there is # a separate, shorter timeout for validation tasks. -CELERY_TASK_SOFT_TIME_LIMIT = 60 * 30 +CELERYD_TASK_SOFT_TIME_LIMIT = 60 * 30 # Logging LOG_LEVEL = logging.DEBUG @@ -1555,7 +1545,7 @@ LOGIN_RATELIMIT_ALL_USERS = '15/m' CSRF_FAILURE_VIEW = 'olympia.amo.views.csrf_failure' # Testing responsiveness without rate limits. -CELERY_WORKER_DISABLE_RATE_LIMITS = True +CELERY_DISABLE_RATE_LIMITS = True # Default file storage mechanism that holds media. DEFAULT_FILE_STORAGE = 'olympia.amo.utils.LocalFileStorage' diff --git a/src/olympia/lib/tests/test_celery.py b/src/olympia/lib/tests/test_celery.py index 846c7cb961..2523bae59b 100644 --- a/src/olympia/lib/tests/test_celery.py +++ b/src/olympia/lib/tests/test_celery.py @@ -2,14 +2,12 @@ from django.conf import settings def test_celery_routes_in_queues(): - queues_in_queues = set([q.name for q in settings.CELERY_TASK_QUEUES]) - + queues_in_queues = set([q.name for q in settings.CELERY_QUEUES]) # check the default queue is defined in CELERY_QUEUES - assert settings.CELERY_TASK_DEFAULT_QUEUE in queues_in_queues - + assert settings.CELERY_DEFAULT_QUEUE in queues_in_queues # then remove it as it won't be in CELERY_ROUTES - queues_in_queues.remove(settings.CELERY_TASK_DEFAULT_QUEUE) + queues_in_queues.remove(settings.CELERY_DEFAULT_QUEUE) queues_in_routes = set( - [c['queue'] for c in settings.CELERY_TASK_ROUTES.values()]) + [c['queue'] for c in settings.CELERY_ROUTES.values()]) assert queues_in_queues == queues_in_routes diff --git a/src/olympia/stats/cron.py b/src/olympia/stats/cron.py index 699204e4a3..40033eae84 100644 --- a/src/olympia/stats/cron.py +++ b/src/olympia/stats/cron.py @@ -4,7 +4,7 @@ from django.core.management import call_command from django.db.models import Sum, Max import waffle -from celery import group +from celery.task.sets import TaskSet import olympia.core.logger from olympia.amo.utils import chunked @@ -26,7 +26,7 @@ def update_addons_collections_downloads(): ts = [tasks.update_addons_collections_downloads.subtask(args=[chunk]) for chunk in chunked(d, 100)] - group(ts).apply_async() + TaskSet(ts).apply_async() def update_collections_total(): @@ -37,7 +37,7 @@ def update_collections_total(): ts = [tasks.update_collections_total.subtask(args=[chunk]) for chunk in chunked(d, 50)] - group(ts).apply_async() + TaskSet(ts).apply_async() def update_global_totals(date=None): @@ -57,7 +57,7 @@ def update_global_totals(date=None): ts = [tasks.update_global_totals.subtask(kwargs=kw) for kw in today_jobs + metrics_jobs] - group(ts).apply_async() + TaskSet(ts).apply_async() def update_google_analytics(date=None): diff --git a/src/olympia/users/cron.py b/src/olympia/users/cron.py index fd5af09361..7233cae51e 100644 --- a/src/olympia/users/cron.py +++ b/src/olympia/users/cron.py @@ -1,7 +1,7 @@ from django.db import connections import multidb -from celery import group +from celery.task.sets import TaskSet import olympia.core.logger @@ -42,5 +42,4 @@ def update_user_ratings(): ts = [update_user_ratings_task.subtask(args=[chunk]) for chunk in chunked(d, 1000)] - - group(ts).apply_async() + TaskSet(ts).apply_async()