diff --git a/.gitignore b/.gitignore index caeade31..ed0a7f6a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .env +.local *.pyc .DS_Store docs/_build diff --git a/atmo/apps.py b/atmo/apps.py index 2fd25196..7484621e 100644 --- a/atmo/apps.py +++ b/atmo/apps.py @@ -3,68 +3,14 @@ # file, you can obtain one at http://mozilla.org/MPL/2.0/. import logging -import django_rq -import redis import session_csrf from django.apps import AppConfig -from django.utils.module_loading import import_string DEFAULT_JOB_TIMEOUT = 15 logger = logging.getLogger("django") -job_schedule = { - 'delete_clusters': { - 'cron_string': '* * * * *', - 'func': 'atmo.clusters.jobs.delete_clusters', - 'timeout': 15, - }, - 'send_expiration_mails': { - 'cron_string': '*/5 * * * *', - 'func': 'atmo.clusters.jobs.send_expiration_mails', - 'timeout': 60, - }, - 'send_run_alert_mails': { - 'cron_string': '* * * * *', - 'func': 'atmo.jobs.jobs.send_run_alert_mails', - 'timeout': 60, - }, - 'update_clusters_info': { - 'cron_string': '* * * * *', - 'func': 'atmo.clusters.jobs.update_clusters_info', - 'timeout': 15, - }, - 'run_jobs': { - 'cron_string': '*/5 * * * *', - 'func': 'atmo.jobs.jobs.run_jobs', - 'timeout': 45, - }, - 'clean_orphan_obj_perms': { - 'cron_string': '30 3 * * *', - 'func': 'guardian.utils.clean_orphan_obj_perms', - 'timeout': 60, - } -} - - -def register_job_schedule(): - """ - Register the RQ job schedule, and cancel all the old ones - """ - scheduler = django_rq.get_scheduler() - for job_id, params in list(job_schedule.items()): - scheduler.cron( - params['cron_string'], - id=job_id, - func=import_string(params['func']), - timeout=params.get('timeout', DEFAULT_JOB_TIMEOUT) - ) - for job in scheduler.get_jobs(): - if job.id not in job_schedule: - scheduler.cancel(job) - - class AtmoAppConfig(AppConfig): name = 'atmo' @@ -79,15 +25,6 @@ class AtmoAppConfig(AppConfig): # https://github.com/mozilla/sugardough/issues/38 session_csrf.monkeypatch() - # Register rq scheduled jobs, if Redis is available - try: - connection = django_rq.get_connection('default') - connection.ping() - except redis.ConnectionError: - logger.warning('Could not connect to Redis, not reigstering RQ jobs') - else: - register_job_schedule() - class KeysAppConfig(AppConfig): name = 'atmo.keys' diff --git a/atmo/celery.py b/atmo/celery.py new file mode 100644 index 00000000..6ceef6f0 --- /dev/null +++ b/atmo/celery.py @@ -0,0 +1,92 @@ +import functools +import os +import random +from celery import Celery + +# set the default Django settings module for the 'celery' program. +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'atmo.settings') +os.environ.setdefault('DJANGO_CONFIGURATION', 'Dev') + +import configurations # noqa +configurations.setup() + + +class ExpoBackoffFullJitter: + """ + Implement fully jittered exponential retries. + + See for more infos: + + - https://www.awsarchitectureblog.com/2015/03/backoff.html + - https://github.com/awslabs/aws-arch-backoff-simulator + + Copyright 2015 Amazon + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + """ + def __init__(self, base, cap): + self.base = base + self.cap = cap + + def expo(self, n): + return min(self.cap, pow(2, n) * self.base) + + def backoff(self, n): + v = self.expo(n) + return random.uniform(0, v) + + +class AtmoCelery(Celery): + """ + A custom Celery class to implement exponential backoff retries. + """ + def autoretry_task(self, autoretry_on=None, *args, **opts): + if autoretry_on is None: + autoretry_on = Exception + super_task = super().task + + def decorator(func): + bind_passed = 'bind' in opts + opts['bind'] = True + + @super_task(*args, **opts) + @functools.wraps(func) + def wrapper(task, *fargs, **fkwargs): + try: + if bind_passed: + return func(task, *fargs, **fkwargs) + else: + return func(*fargs, **fkwargs) + except autoretry_on as exc: + backoff = ExpoBackoffFullJitter(base=1, cap=60 * 60) + countdown = backoff.backoff(task.request.retries) + task.retry( + exc=exc, + args=fargs, + kwargs=fkwargs, + countdown=countdown, + ) + return wrapper + return decorator + + +celery = AtmoCelery('atmo') + +# Using a string here means the worker don't have to serialize +# the configuration object to child processes. +# - namespace='CELERY' means all celery-related configuration keys +# should have a `CELERY_` prefix. +celery.config_from_object('django.conf:settings', namespace='CELERY') + +# Load task modules from all registered Django celery configs. +celery.autodiscover_tasks() diff --git a/atmo/clusters/management/commands/delete_clusters.py b/atmo/clusters/management/commands/deactivate_clusters.py similarity index 88% rename from atmo/clusters/management/commands/delete_clusters.py rename to atmo/clusters/management/commands/deactivate_clusters.py index 445fb108..f61cd000 100644 --- a/atmo/clusters/management/commands/delete_clusters.py +++ b/atmo/clusters/management/commands/deactivate_clusters.py @@ -3,7 +3,7 @@ # file, you can obtain one at http://mozilla.org/MPL/2.0/. from django.core.management.base import BaseCommand -from ...jobs import delete_clusters +from ...jobs import deactivate_clusters class Command(BaseCommand): @@ -11,5 +11,5 @@ class Command(BaseCommand): def handle(self, *args, **options): self.stdout.write('Deleting expired clusters...', ending='') - delete_clusters() + deactivate_clusters() self.stdout.write('done.') diff --git a/atmo/clusters/management/commands/update_clusters_info.py b/atmo/clusters/management/commands/update_clusters.py similarity index 87% rename from atmo/clusters/management/commands/update_clusters_info.py rename to atmo/clusters/management/commands/update_clusters.py index ff08d458..e1024a48 100644 --- a/atmo/clusters/management/commands/update_clusters_info.py +++ b/atmo/clusters/management/commands/update_clusters.py @@ -3,7 +3,7 @@ # file, you can obtain one at http://mozilla.org/MPL/2.0/. from django.core.management.base import BaseCommand -from ...jobs import update_clusters_info +from ...jobs import update_clusters class Command(BaseCommand): @@ -11,5 +11,5 @@ class Command(BaseCommand): def handle(self, *args, **options): self.stdout.write('Updating cluster info...', ending='') - update_clusters_info() + update_clusters() self.stdout.write('done.') diff --git a/atmo/clusters/jobs.py b/atmo/clusters/tasks.py similarity index 86% rename from atmo/clusters/jobs.py rename to atmo/clusters/tasks.py index e51bf116..7a824108 100644 --- a/atmo/clusters/jobs.py +++ b/atmo/clusters/tasks.py @@ -3,26 +3,29 @@ # file, you can obtain one at http://mozilla.org/MPL/2.0/. from datetime import timedelta -import django_rq -import newrelic.agent from django.conf import settings +from django.db import transaction from django.template.loader import render_to_string from django.utils import timezone from .. import email +from ..celery import celery from .models import Cluster from .provisioners import ClusterProvisioner -@newrelic.agent.background_task(group='RQ') -def delete_clusters(): +@celery.task +def deactivate_clusters(): now = timezone.now() + deactivated_clusters = [] for cluster in Cluster.objects.active().filter(end_date__lte=now): + deactivated_clusters.append([cluster.identifier, cluster.pk]) # The cluster is expired cluster.deactivate() + return deactivated_clusters -@newrelic.agent.background_task(group='RQ') +@celery.task def send_expiration_mails(): deadline = timezone.now() + timedelta(hours=1) soon_expired = Cluster.objects.active().filter( @@ -47,7 +50,7 @@ def send_expiration_mails(): cluster.save() -@newrelic.agent.background_task(group='RQ') +@celery.autoretry_task() def update_master_address(cluster_id, force=False): """ Update the public IP address for the cluster with the given cluster ID @@ -65,8 +68,8 @@ def update_master_address(cluster_id, force=False): cluster.save() -@newrelic.agent.background_task(group='RQ') -def update_clusters_info(): +@celery.autoretry_task() +def update_clusters(): """ Update the cluster metadata from AWS for the pending clusters. @@ -80,7 +83,7 @@ def update_clusters_info(): # Short-circuit for no active clusters (e.g. on weekends) if not active_clusters.exists(): - return + return [] # get the start dates of the active clusters, set to the start of the day # to counteract time differences between atmo and AWS and use the oldest @@ -94,6 +97,7 @@ def update_clusters_info(): cluster_mapping[cluster_info['jobflow_id']] = cluster_info # go through pending clusters and update the state if needed + updated_clusters = [] for cluster in active_clusters: info = cluster_mapping.get(cluster.jobflow_id) # ignore if no info was found for some reason, @@ -109,6 +113,11 @@ def update_clusters_info(): cluster.most_recent_status = info['state'] cluster.save() + updated_clusters.append(cluster.identifier) + # if not given enqueue a job to update the public IP address if not cluster.master_address: - django_rq.enqueue(update_master_address, cluster.id) + transaction.on_commit( + lambda: update_master_address.delay(cluster.id) + ) + return updated_clusters diff --git a/atmo/jobs/jobs.py b/atmo/jobs/tasks.py similarity index 95% rename from atmo/jobs/jobs.py rename to atmo/jobs/tasks.py index 4a706fce..a2e2a497 100644 --- a/atmo/jobs/jobs.py +++ b/atmo/jobs/tasks.py @@ -3,12 +3,12 @@ # file, you can obtain one at http://mozilla.org/MPL/2.0/. import logging -import newrelic.agent from django.conf import settings from django.db import transaction from django.template.loader import render_to_string from django.utils import timezone +from atmo.celery import celery from atmo.clusters.models import Cluster from atmo.clusters.provisioners import ClusterProvisioner @@ -18,7 +18,7 @@ from .models import SparkJob, SparkJobRunAlert logger = logging.getLogger(__name__) -@newrelic.agent.background_task(group='RQ') +@celery.autoretry_task() def run_jobs(): """ Run all the scheduled tasks that are supposed to run. @@ -75,14 +75,15 @@ def run_jobs(): return run_jobs -@newrelic.agent.background_task(group='RQ') +@celery.task def send_run_alert_mails(): failed_run_alerts = SparkJobRunAlert.objects.filter( reason_code__in=Cluster.FAILED_STATE_CHANGE_REASON_LIST, mail_sent_date__isnull=True, ).prefetch_related('run__spark_job__created_by') - + failed_jobs = [] for alert in failed_run_alerts: + failed_jobs.append(alert.run.spark_job.identifier) subject = '[ATMO] Running Spark job %s failed' % alert.run.spark_job.identifier body = render_to_string( 'atmo/jobs/mails/failed_run_alert_body.txt', { @@ -98,3 +99,4 @@ def send_run_alert_mails(): ) alert.mail_sent_date = timezone.now() alert.save() + return failed_jobs diff --git a/atmo/settings.py b/atmo/settings.py index 99179985..ea0b2223 100644 --- a/atmo/settings.py +++ b/atmo/settings.py @@ -10,10 +10,12 @@ https://docs.djangoproject.com/en/1.9/topics/settings/ For the full list of settings and their values, see https://docs.djangoproject.com/en/1.9/ref/settings/ """ +import logging import os import subprocess from datetime import timedelta +from celery.schedules import crontab from configurations import Configuration, values from django.contrib.messages import constants as messages from django.core.urlresolvers import reverse_lazy @@ -21,6 +23,80 @@ from dockerflow.version import get_version from raven.transport.requests import RequestsHTTPTransport +class Celery: + CELERY_BROKER_TRANSPORT_OPTIONS = { + # only send messages to actual virtual AMQP host instead of all + 'fanout_prefix': True, + # have the workers only subscribe to worker related events (less network traffic) + 'fanout_patterns': True, + # 8 days, since that's longer than our biggest interval to schedule a task (a week) + # this is needed to be able to use ETAs and countdowns + # http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#id1 + 'visibility_timeout': 8 * 24 * 60 * 60, + } + # Use the django_celery_results database backend. + CELERY_RESULT_BACKEND = 'django-db' + # Throw away task results after two weeks, for debugging purposes. + CELERY_RESULT_EXPIRES = timedelta(days=14) + # Track if a task has been started, not only pending etc. + CELERY_TASK_TRACK_STARTED = True + # Add a 1 minute soft timeout to all Celery tasks. + CELERY_TASK_SOFT_TIME_LIMIT = 60 + # And a 2 minute hard timeout. + CELERY_TASK_TIME_LIMIT = CELERY_TASK_SOFT_TIME_LIMIT * 2 + # Send SENT events as well to know when the task has left the scheduler. + CELERY_TASK_SEND_SENT_EVENT = True + # Stop hijacking the root logger so Sentry works. + CELERY_WORKER_HIJACK_ROOT_LOGGER = False + # Use the django_celery_beat scheduler for database based schedules. + CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' + # The default/initial schedule to use. + CELERY_BEAT_SCHEDULE = { + 'deactivate_clusters': { + 'schedule': crontab(minute='*'), + 'task': 'atmo.clusters.tasks.deactivate_clusters', + 'options': { + 'soft_time_limit': 15, + 'expires': 40, + }, + }, + 'send_expiration_mails': { + 'schedule': crontab(minute='*/5'), # every 5 minutes + 'task': 'atmo.clusters.tasks.send_expiration_mails', + 'options': { + 'expires': 4 * 60, + }, + }, + 'send_run_alert_mails': { + 'schedule': crontab(minute='*'), + 'task': 'atmo.jobs.tasks.send_run_alert_mails', + 'options': { + 'expires': 40, + }, + }, + 'update_clusters': { + 'schedule': crontab(minute='*'), + 'task': 'atmo.clusters.tasks.update_clusters', + 'options': { + 'soft_time_limit': 15, + 'expires': 40, + }, + }, + 'run_jobs': { + 'schedule': crontab(minute='*/5'), + 'task': 'atmo.jobs.tasks.run_jobs', + 'options': { + 'soft_time_limit': 45, + 'expires': 40, + }, + }, + 'clean_orphan_obj_perms': { + 'schedule': crontab(minute=30, hour=3), + 'task': 'guardian.utils.clean_orphan_obj_perms', + } + } + + class Constance: "Constance settings" CONSTANCE_BACKEND = 'constance.backends.redisd.RedisBackend' @@ -129,7 +205,7 @@ class CSP: ) -class Core(Constance, CSP, AWS, Configuration): +class Core(AWS, Celery, Constance, CSP, Configuration): """Settings that will never change per-environment.""" # Build paths inside the project like this: os.path.join(BASE_DIR, ...) @@ -150,7 +226,6 @@ class Core(Constance, CSP, AWS, Configuration): 'atmo.users', # Third party apps - 'django_rq', 'allauth', 'allauth.account', 'allauth.socialaccount', @@ -158,6 +233,8 @@ class Core(Constance, CSP, AWS, Configuration): 'constance', 'constance.backends.database', 'dockerflow.django', + 'django_celery_results', + 'django_celery_beat', # Django apps 'django.contrib.sites', @@ -187,19 +264,6 @@ class Core(Constance, CSP, AWS, Configuration): WSGI_APPLICATION = 'atmo.wsgi.application' - RQ_SHOW_ADMIN_LINK = True - - # set the exponential backoff mechanism of rq-retry - def exponential_backoff(tries, base=2): - return ','.join([str(pow(base, exponent)) for exponent in range(tries)]) - - # the total number of tries for each task, 1 regular try + 5 retries - RQ_RETRY_MAX_TRIES = 6 - os.environ['RQ_RETRY_MAX_TRIES'] = str(RQ_RETRY_MAX_TRIES) - - # this needs to be set as an environment variable since that's how rq-retry works - os.environ['RQ_RETRY_DELAYS'] = RQ_RETRY_DELAYS = exponential_backoff(RQ_RETRY_MAX_TRIES - 1) - # Add the django-allauth authentication backend. AUTHENTICATION_BACKENDS = ( 'django.contrib.auth.backends.ModelBackend', @@ -348,17 +412,14 @@ class Base(Core): # https://docs.djangoproject.com/en/1.9/ref/settings/#databases DATABASES = values.DatabaseURLValue('postgres://postgres@db/postgres') - RQ_QUEUES = { - 'default': { - 'USE_REDIS_CACHE': 'default', - } - } - + REDIS_URL_DEFAULT = 'redis://redis:6379/1' CACHES = values.CacheURLValue( - 'redis://redis:6379/1', + REDIS_URL_DEFAULT, environ_prefix=None, environ_name='REDIS_URL', ) + # Use redis as the Celery broker. + CELERY_BROKER_URL = os.environ.get('REDIS_URL', REDIS_URL_DEFAULT) LOGGING_USE_JSON = values.BooleanValue(False) @@ -411,16 +472,6 @@ class Base(Core): 'handlers': ['console'], 'propagate': False, }, - 'rq': { - 'handlers': ['console', 'sentry'], - 'level': 'INFO', - 'propagate': False, - }, - 'rq.worker': { - 'handlers': ['console', 'sentry'], - 'level': 'INFO', - 'propagate': False, - }, 'request.summary': { 'handlers': ['console'], 'level': 'DEBUG', @@ -498,6 +549,7 @@ class Stage(Base): # Sentry setup SENTRY_DSN = values.Value(environ_prefix=None) SENTRY_PUBLIC_DSN = values.Value(environ_prefix=None) + SENTRY_CELERY_LOGLEVEL = logging.INFO MIDDLEWARE_CLASSES = ( 'raven.contrib.django.raven_compat.middleware.SentryResponseErrorIdMiddleware', diff --git a/atmo/urls.py b/atmo/urls.py index 7b32dd9f..320cb8a9 100644 --- a/atmo/urls.py +++ b/atmo/urls.py @@ -16,7 +16,6 @@ admin.site.login = login_required(admin.site.login) urlpatterns = [ url(r'^$', views.dashboard, name='dashboard'), - url(r'^admin/rq/', include('django_rq.urls')), url(r'^admin/', include(admin.site.urls)), url(r'clusters/', include('atmo.clusters.urls')), diff --git a/bin/run b/bin/run index 9f6d53dd..cceff8a7 100755 --- a/bin/run +++ b/bin/run @@ -40,10 +40,11 @@ case $1 in exec python manage.py runserver 0.0.0.0:${PORT} ;; worker) - exec newrelic-admin run-python manage.py rqworker --worker-class=rq_retry.RetryWorker default + exec newrelic-admin run-program celery -A atmo.celery:celery worker -l info -O fair ;; scheduler) - exec newrelic-admin run-python manage.py rqscheduler + python manage.py migrate --noinput + exec newrelic-admin run-program celery -A atmo.celery:celery beat -l info --pidfile /app/celerybeat.pid ;; test) python manage.py collectstatic --noinput diff --git a/newrelic.ini b/newrelic.ini index b46b00d1..01b86b2b 100644 --- a/newrelic.ini +++ b/newrelic.ini @@ -166,7 +166,7 @@ error_collector.enabled = true # To stop specific errors from reporting to the UI, set this to # a space separated list of the Python exception type names to # ignore. The exception name should be of the form 'module:class'. -error_collector.ignore_errors = +error_collector.ignore_errors = celery.exceptions:Retry celery.exceptions:RetryTaskError # Browser monitoring is the Real User Monitoring feature of the UI. # For those Python web frameworks that are supported, this diff --git a/requirements.txt b/requirements.txt index d2e2fb84..6a097a68 100644 --- a/requirements.txt +++ b/requirements.txt @@ -109,17 +109,6 @@ pbr==2.0.0 \ --hash=sha256:0ccd2db529afd070df815b1521f01401d43de03941170f8a800e7531faba265d redis==2.10.5 \ --hash=sha256:97156b37d7cda4e7d8658be1148c983984e1a975090ba458cc7e244025191dbd -click==6.7 \ - --hash=sha256:29f99fc6125fbc931b758dc053b3114e55c77a6e4c6c3a2674a2dc986016381d \ - --hash=sha256:f15516df478d5a56180fbf80e68f206010e6d160fc39fa508b65e035fd75130b -rq==0.7.1 \ - --hash=sha256:6c36f146914f6a28009dbcee0c4610600afd1f4f493ce3a2afbd467076cddaac \ - --hash=sha256:b0e98fcfe980cbc7644447d17ea2c177fcbd5c04f1f92d5136c47f00ed2d583d -django-rq==0.9.5 \ - --hash=sha256:74e5ba53d3a2ade0e652a1e2f35d7a6f3b15022433c24fbcb977198e735ed9e7 \ - --hash=sha256:33f9cb5d26b6cb3d2d7f8b25494b4985e09227cb80058627443bdb6bdb99e1db -croniter==0.3.15 \ - --hash=sha256:ac0c9811ebdecd27bc29eb0711e2d87c8c280a53d0e92c48b600654d23b1d541 django-allauth==0.31.0 \ --hash=sha256:edfff4599cd5431e4af380ddb9fc66e7b923157dcac67cceb5604b3c58003907 python3-openid==3.1.0 \ @@ -330,11 +319,6 @@ msgpack-python==0.4.8 \ defusedxml==0.5.0 \ --hash=sha256:702a91ade2968a82beb0db1e0766a6a273f33d4616a6ce8cde475d8e09853b20 \ --hash=sha256:24d7f2f94f7f3cb6061acb215685e5125fbcdc40a857eff9de22518820b0a4f4 -https://github.com/ui/rq-scheduler/archive/5dd071328ee3eeaad67ecf7cb2550ef8fa08a787.zip#egg=rq-scheduler \ - --hash=sha256:837c74ca0c42632a796bcf954dcde5515c17168c4a27d169f49cbc6ed880df4e -rq-retry==0.3.0 \ - --hash=sha256:a0ecb39b02c9df3bac9ef88cda2d93cefb9120e3d8ccef920b38a851330e917f \ - --hash=sha256:c8cafedf3b11b931f422b12b304c9815cc84c3d145cd05aff337a0b973d0c000 asn1crypto==0.22.0 \ --hash=sha256:d232509fefcfcdb9a331f37e9c9dc20441019ad927c7d2176cf18ed5da0ba097 \ --hash=sha256:cbbadd640d3165ab24b06ef25d1dca09a3441611ac15f6a6b452474fdf0aed1a @@ -357,3 +341,29 @@ future==0.16.0 \ pytest-flake8==0.8.1 \ --hash=sha256:aa10a6db147485d71dad391d4149388904c3072194d51755f64784ff128845fd \ --hash=sha256:8efaf4595a13079197ac740a12e6a87e3403f08133a42d3ac5984474f6f91681 +celery==4.0.2 \ + --hash=sha256:0e5b7e0d7f03aa02061abfd27aa9da05b6740281ca1f5228a54fbf7fe74d8afa \ + --hash=sha256:e3d5a6c56a73ff8f2ddd4d06dc37f4c2afe4bb4da7928b884d0725ea865ef54d +billiard==3.5.0.2 \ + --hash=sha256:82478ebdd3cd4d357613cb4735eb828014364c5e02d3bba96ea6c1565a2c0172 \ + --hash=sha256:a93c90d59fca62ad63f92b3d2bf1d752c154dde90a3100dba4c8e439386e534c \ + --hash=sha256:d8df4b276b11b3e2fe25652e411487bda6e5bac4f8fd236a278a2bfe300f7c43 \ + --hash=sha256:e740e352bbf7b6c8cc92a2596cc2da2bfb4ab1009aeb68bf844456af4e924278 \ + --hash=sha256:52c2e01c95c6edae9ca1f48d83503e6ceeafbf28c19929a1e917fe951a83adb1 \ + --hash=sha256:03f669755f1d6b7dbe528fe615a3164b0d8efca095382c514dd7026dfb79a9c6 \ + --hash=sha256:3eb01a8fe44116aa6d63d2010515ef1526e40caee5f766f75b2d28393332dcaa +kombu==4.0.2 \ + --hash=sha256:385bf38e6de7f3851f674671dbfe24572ce999608d293a85fb8a630654d8bd9c \ + --hash=sha256:d0fc6f2a36610a308f838db4b832dad79a681b516ac1d1a1f9d42edb58cc11a2 +amqp==2.1.4 \ + --hash=sha256:5e0871a93433f941e444c2b859da095f05034d2ac1b7c084529cfd0b6f8eef18 \ + --hash=sha256:1378cc14afeb6c2850404f322d03dec0082d11d04bdcb0360e1b10d4e6e77ef9 +vine==1.1.3 \ + --hash=sha256:739b19304065de99bd1f4665abe461b449b1022c1e4f89a7925db9d50e9741ea \ + --hash=sha256:87b95da19249373430a8fafca36f1aecb7aa0f1cc78545877857afc46aea2441 +django_celery_results==1.0.1 \ + --hash=sha256:dfa240fb535a1a2d01c9e605ad71629909318eae6b893c5009eafd7265fde10b \ + --hash=sha256:8bca2605eeff4418be7ce428a6958d64bee0f5bdf1f8e563fbc09a9e2f3d990f +django_celery_beat==1.0.1 \ + --hash=sha256:6d132af1e24c313eb47042da71cf30c5f612f93337a1c85309ad2d490f76bf8b \ + --hash=sha256:c8d5233fd0eb3404a800cc62383d241ac6f95b04d3a87b3720d212f0c85654b9 diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 8b7d2a2e..318e36fe 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -11,7 +11,7 @@ from django.utils import timezone from freezegun import freeze_time from atmo.clusters.models import Cluster -from atmo.jobs import jobs, models +from atmo.jobs import models, tasks @pytest.fixture @@ -638,7 +638,7 @@ def test_send_run_alert_mails(client, mocker, test_user, mocked_send_email = mocker.patch('atmo.email.send_email') - jobs.send_run_alert_mails() + tasks.send_run_alert_mails() mocked_send_email.assert_called_once_with( to=spark_job.created_by.email,