This does a few things:

- Move from RQ to Celery for better control of the queuing process and scheduling and processing guarentees.
- Implement jittered exponential retries (see https://www.awsarchitectureblog.com/2015/03/backoff.html and https://github.com/awslabs/aws-arch-backoff-simulator for more infos).
- Trigger some tasks only on database commit.
- Remove unneeded Newrelic decorators.
This commit is contained in:
Jannis Leidel 2017-03-16 15:26:23 +01:00
Родитель 27dfbc554f
Коммит 5c8b20eead
13 изменённых файлов: 238 добавлений и 135 удалений

1
.gitignore поставляемый
Просмотреть файл

@ -1,4 +1,5 @@
.env
.local
*.pyc
.DS_Store
docs/_build

Просмотреть файл

@ -3,68 +3,14 @@
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
import logging
import django_rq
import redis
import session_csrf
from django.apps import AppConfig
from django.utils.module_loading import import_string
DEFAULT_JOB_TIMEOUT = 15
logger = logging.getLogger("django")
job_schedule = {
'delete_clusters': {
'cron_string': '* * * * *',
'func': 'atmo.clusters.jobs.delete_clusters',
'timeout': 15,
},
'send_expiration_mails': {
'cron_string': '*/5 * * * *',
'func': 'atmo.clusters.jobs.send_expiration_mails',
'timeout': 60,
},
'send_run_alert_mails': {
'cron_string': '* * * * *',
'func': 'atmo.jobs.jobs.send_run_alert_mails',
'timeout': 60,
},
'update_clusters_info': {
'cron_string': '* * * * *',
'func': 'atmo.clusters.jobs.update_clusters_info',
'timeout': 15,
},
'run_jobs': {
'cron_string': '*/5 * * * *',
'func': 'atmo.jobs.jobs.run_jobs',
'timeout': 45,
},
'clean_orphan_obj_perms': {
'cron_string': '30 3 * * *',
'func': 'guardian.utils.clean_orphan_obj_perms',
'timeout': 60,
}
}
def register_job_schedule():
"""
Register the RQ job schedule, and cancel all the old ones
"""
scheduler = django_rq.get_scheduler()
for job_id, params in list(job_schedule.items()):
scheduler.cron(
params['cron_string'],
id=job_id,
func=import_string(params['func']),
timeout=params.get('timeout', DEFAULT_JOB_TIMEOUT)
)
for job in scheduler.get_jobs():
if job.id not in job_schedule:
scheduler.cancel(job)
class AtmoAppConfig(AppConfig):
name = 'atmo'
@ -79,15 +25,6 @@ class AtmoAppConfig(AppConfig):
# https://github.com/mozilla/sugardough/issues/38
session_csrf.monkeypatch()
# Register rq scheduled jobs, if Redis is available
try:
connection = django_rq.get_connection('default')
connection.ping()
except redis.ConnectionError:
logger.warning('Could not connect to Redis, not reigstering RQ jobs')
else:
register_job_schedule()
class KeysAppConfig(AppConfig):
name = 'atmo.keys'

92
atmo/celery.py Normal file
Просмотреть файл

@ -0,0 +1,92 @@
import functools
import os
import random
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'atmo.settings')
os.environ.setdefault('DJANGO_CONFIGURATION', 'Dev')
import configurations # noqa
configurations.setup()
class ExpoBackoffFullJitter:
"""
Implement fully jittered exponential retries.
See for more infos:
- https://www.awsarchitectureblog.com/2015/03/backoff.html
- https://github.com/awslabs/aws-arch-backoff-simulator
Copyright 2015 Amazon
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
def __init__(self, base, cap):
self.base = base
self.cap = cap
def expo(self, n):
return min(self.cap, pow(2, n) * self.base)
def backoff(self, n):
v = self.expo(n)
return random.uniform(0, v)
class AtmoCelery(Celery):
"""
A custom Celery class to implement exponential backoff retries.
"""
def autoretry_task(self, autoretry_on=None, *args, **opts):
if autoretry_on is None:
autoretry_on = Exception
super_task = super().task
def decorator(func):
bind_passed = 'bind' in opts
opts['bind'] = True
@super_task(*args, **opts)
@functools.wraps(func)
def wrapper(task, *fargs, **fkwargs):
try:
if bind_passed:
return func(task, *fargs, **fkwargs)
else:
return func(*fargs, **fkwargs)
except autoretry_on as exc:
backoff = ExpoBackoffFullJitter(base=1, cap=60 * 60)
countdown = backoff.backoff(task.request.retries)
task.retry(
exc=exc,
args=fargs,
kwargs=fkwargs,
countdown=countdown,
)
return wrapper
return decorator
celery = AtmoCelery('atmo')
# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
celery.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django celery configs.
celery.autodiscover_tasks()

Просмотреть файл

@ -3,7 +3,7 @@
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
from django.core.management.base import BaseCommand
from ...jobs import delete_clusters
from ...jobs import deactivate_clusters
class Command(BaseCommand):
@ -11,5 +11,5 @@ class Command(BaseCommand):
def handle(self, *args, **options):
self.stdout.write('Deleting expired clusters...', ending='')
delete_clusters()
deactivate_clusters()
self.stdout.write('done.')

Просмотреть файл

@ -3,7 +3,7 @@
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
from django.core.management.base import BaseCommand
from ...jobs import update_clusters_info
from ...jobs import update_clusters
class Command(BaseCommand):
@ -11,5 +11,5 @@ class Command(BaseCommand):
def handle(self, *args, **options):
self.stdout.write('Updating cluster info...', ending='')
update_clusters_info()
update_clusters()
self.stdout.write('done.')

Просмотреть файл

@ -3,26 +3,29 @@
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
from datetime import timedelta
import django_rq
import newrelic.agent
from django.conf import settings
from django.db import transaction
from django.template.loader import render_to_string
from django.utils import timezone
from .. import email
from ..celery import celery
from .models import Cluster
from .provisioners import ClusterProvisioner
@newrelic.agent.background_task(group='RQ')
def delete_clusters():
@celery.task
def deactivate_clusters():
now = timezone.now()
deactivated_clusters = []
for cluster in Cluster.objects.active().filter(end_date__lte=now):
deactivated_clusters.append([cluster.identifier, cluster.pk])
# The cluster is expired
cluster.deactivate()
return deactivated_clusters
@newrelic.agent.background_task(group='RQ')
@celery.task
def send_expiration_mails():
deadline = timezone.now() + timedelta(hours=1)
soon_expired = Cluster.objects.active().filter(
@ -47,7 +50,7 @@ def send_expiration_mails():
cluster.save()
@newrelic.agent.background_task(group='RQ')
@celery.autoretry_task()
def update_master_address(cluster_id, force=False):
"""
Update the public IP address for the cluster with the given cluster ID
@ -65,8 +68,8 @@ def update_master_address(cluster_id, force=False):
cluster.save()
@newrelic.agent.background_task(group='RQ')
def update_clusters_info():
@celery.autoretry_task()
def update_clusters():
"""
Update the cluster metadata from AWS for the pending
clusters.
@ -80,7 +83,7 @@ def update_clusters_info():
# Short-circuit for no active clusters (e.g. on weekends)
if not active_clusters.exists():
return
return []
# get the start dates of the active clusters, set to the start of the day
# to counteract time differences between atmo and AWS and use the oldest
@ -94,6 +97,7 @@ def update_clusters_info():
cluster_mapping[cluster_info['jobflow_id']] = cluster_info
# go through pending clusters and update the state if needed
updated_clusters = []
for cluster in active_clusters:
info = cluster_mapping.get(cluster.jobflow_id)
# ignore if no info was found for some reason,
@ -109,6 +113,11 @@ def update_clusters_info():
cluster.most_recent_status = info['state']
cluster.save()
updated_clusters.append(cluster.identifier)
# if not given enqueue a job to update the public IP address
if not cluster.master_address:
django_rq.enqueue(update_master_address, cluster.id)
transaction.on_commit(
lambda: update_master_address.delay(cluster.id)
)
return updated_clusters

Просмотреть файл

@ -3,12 +3,12 @@
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
import logging
import newrelic.agent
from django.conf import settings
from django.db import transaction
from django.template.loader import render_to_string
from django.utils import timezone
from atmo.celery import celery
from atmo.clusters.models import Cluster
from atmo.clusters.provisioners import ClusterProvisioner
@ -18,7 +18,7 @@ from .models import SparkJob, SparkJobRunAlert
logger = logging.getLogger(__name__)
@newrelic.agent.background_task(group='RQ')
@celery.autoretry_task()
def run_jobs():
"""
Run all the scheduled tasks that are supposed to run.
@ -75,14 +75,15 @@ def run_jobs():
return run_jobs
@newrelic.agent.background_task(group='RQ')
@celery.task
def send_run_alert_mails():
failed_run_alerts = SparkJobRunAlert.objects.filter(
reason_code__in=Cluster.FAILED_STATE_CHANGE_REASON_LIST,
mail_sent_date__isnull=True,
).prefetch_related('run__spark_job__created_by')
failed_jobs = []
for alert in failed_run_alerts:
failed_jobs.append(alert.run.spark_job.identifier)
subject = '[ATMO] Running Spark job %s failed' % alert.run.spark_job.identifier
body = render_to_string(
'atmo/jobs/mails/failed_run_alert_body.txt', {
@ -98,3 +99,4 @@ def send_run_alert_mails():
)
alert.mail_sent_date = timezone.now()
alert.save()
return failed_jobs

Просмотреть файл

@ -10,10 +10,12 @@ https://docs.djangoproject.com/en/1.9/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.9/ref/settings/
"""
import logging
import os
import subprocess
from datetime import timedelta
from celery.schedules import crontab
from configurations import Configuration, values
from django.contrib.messages import constants as messages
from django.core.urlresolvers import reverse_lazy
@ -21,6 +23,80 @@ from dockerflow.version import get_version
from raven.transport.requests import RequestsHTTPTransport
class Celery:
CELERY_BROKER_TRANSPORT_OPTIONS = {
# only send messages to actual virtual AMQP host instead of all
'fanout_prefix': True,
# have the workers only subscribe to worker related events (less network traffic)
'fanout_patterns': True,
# 8 days, since that's longer than our biggest interval to schedule a task (a week)
# this is needed to be able to use ETAs and countdowns
# http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#id1
'visibility_timeout': 8 * 24 * 60 * 60,
}
# Use the django_celery_results database backend.
CELERY_RESULT_BACKEND = 'django-db'
# Throw away task results after two weeks, for debugging purposes.
CELERY_RESULT_EXPIRES = timedelta(days=14)
# Track if a task has been started, not only pending etc.
CELERY_TASK_TRACK_STARTED = True
# Add a 1 minute soft timeout to all Celery tasks.
CELERY_TASK_SOFT_TIME_LIMIT = 60
# And a 2 minute hard timeout.
CELERY_TASK_TIME_LIMIT = CELERY_TASK_SOFT_TIME_LIMIT * 2
# Send SENT events as well to know when the task has left the scheduler.
CELERY_TASK_SEND_SENT_EVENT = True
# Stop hijacking the root logger so Sentry works.
CELERY_WORKER_HIJACK_ROOT_LOGGER = False
# Use the django_celery_beat scheduler for database based schedules.
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# The default/initial schedule to use.
CELERY_BEAT_SCHEDULE = {
'deactivate_clusters': {
'schedule': crontab(minute='*'),
'task': 'atmo.clusters.tasks.deactivate_clusters',
'options': {
'soft_time_limit': 15,
'expires': 40,
},
},
'send_expiration_mails': {
'schedule': crontab(minute='*/5'), # every 5 minutes
'task': 'atmo.clusters.tasks.send_expiration_mails',
'options': {
'expires': 4 * 60,
},
},
'send_run_alert_mails': {
'schedule': crontab(minute='*'),
'task': 'atmo.jobs.tasks.send_run_alert_mails',
'options': {
'expires': 40,
},
},
'update_clusters': {
'schedule': crontab(minute='*'),
'task': 'atmo.clusters.tasks.update_clusters',
'options': {
'soft_time_limit': 15,
'expires': 40,
},
},
'run_jobs': {
'schedule': crontab(minute='*/5'),
'task': 'atmo.jobs.tasks.run_jobs',
'options': {
'soft_time_limit': 45,
'expires': 40,
},
},
'clean_orphan_obj_perms': {
'schedule': crontab(minute=30, hour=3),
'task': 'guardian.utils.clean_orphan_obj_perms',
}
}
class Constance:
"Constance settings"
CONSTANCE_BACKEND = 'constance.backends.redisd.RedisBackend'
@ -129,7 +205,7 @@ class CSP:
)
class Core(Constance, CSP, AWS, Configuration):
class Core(AWS, Celery, Constance, CSP, Configuration):
"""Settings that will never change per-environment."""
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
@ -150,7 +226,6 @@ class Core(Constance, CSP, AWS, Configuration):
'atmo.users',
# Third party apps
'django_rq',
'allauth',
'allauth.account',
'allauth.socialaccount',
@ -158,6 +233,8 @@ class Core(Constance, CSP, AWS, Configuration):
'constance',
'constance.backends.database',
'dockerflow.django',
'django_celery_results',
'django_celery_beat',
# Django apps
'django.contrib.sites',
@ -187,19 +264,6 @@ class Core(Constance, CSP, AWS, Configuration):
WSGI_APPLICATION = 'atmo.wsgi.application'
RQ_SHOW_ADMIN_LINK = True
# set the exponential backoff mechanism of rq-retry
def exponential_backoff(tries, base=2):
return ','.join([str(pow(base, exponent)) for exponent in range(tries)])
# the total number of tries for each task, 1 regular try + 5 retries
RQ_RETRY_MAX_TRIES = 6
os.environ['RQ_RETRY_MAX_TRIES'] = str(RQ_RETRY_MAX_TRIES)
# this needs to be set as an environment variable since that's how rq-retry works
os.environ['RQ_RETRY_DELAYS'] = RQ_RETRY_DELAYS = exponential_backoff(RQ_RETRY_MAX_TRIES - 1)
# Add the django-allauth authentication backend.
AUTHENTICATION_BACKENDS = (
'django.contrib.auth.backends.ModelBackend',
@ -348,17 +412,14 @@ class Base(Core):
# https://docs.djangoproject.com/en/1.9/ref/settings/#databases
DATABASES = values.DatabaseURLValue('postgres://postgres@db/postgres')
RQ_QUEUES = {
'default': {
'USE_REDIS_CACHE': 'default',
}
}
REDIS_URL_DEFAULT = 'redis://redis:6379/1'
CACHES = values.CacheURLValue(
'redis://redis:6379/1',
REDIS_URL_DEFAULT,
environ_prefix=None,
environ_name='REDIS_URL',
)
# Use redis as the Celery broker.
CELERY_BROKER_URL = os.environ.get('REDIS_URL', REDIS_URL_DEFAULT)
LOGGING_USE_JSON = values.BooleanValue(False)
@ -411,16 +472,6 @@ class Base(Core):
'handlers': ['console'],
'propagate': False,
},
'rq': {
'handlers': ['console', 'sentry'],
'level': 'INFO',
'propagate': False,
},
'rq.worker': {
'handlers': ['console', 'sentry'],
'level': 'INFO',
'propagate': False,
},
'request.summary': {
'handlers': ['console'],
'level': 'DEBUG',
@ -498,6 +549,7 @@ class Stage(Base):
# Sentry setup
SENTRY_DSN = values.Value(environ_prefix=None)
SENTRY_PUBLIC_DSN = values.Value(environ_prefix=None)
SENTRY_CELERY_LOGLEVEL = logging.INFO
MIDDLEWARE_CLASSES = (
'raven.contrib.django.raven_compat.middleware.SentryResponseErrorIdMiddleware',

Просмотреть файл

@ -16,7 +16,6 @@ admin.site.login = login_required(admin.site.login)
urlpatterns = [
url(r'^$', views.dashboard, name='dashboard'),
url(r'^admin/rq/', include('django_rq.urls')),
url(r'^admin/', include(admin.site.urls)),
url(r'clusters/', include('atmo.clusters.urls')),

Просмотреть файл

@ -40,10 +40,11 @@ case $1 in
exec python manage.py runserver 0.0.0.0:${PORT}
;;
worker)
exec newrelic-admin run-python manage.py rqworker --worker-class=rq_retry.RetryWorker default
exec newrelic-admin run-program celery -A atmo.celery:celery worker -l info -O fair
;;
scheduler)
exec newrelic-admin run-python manage.py rqscheduler
python manage.py migrate --noinput
exec newrelic-admin run-program celery -A atmo.celery:celery beat -l info --pidfile /app/celerybeat.pid
;;
test)
python manage.py collectstatic --noinput

Просмотреть файл

@ -166,7 +166,7 @@ error_collector.enabled = true
# To stop specific errors from reporting to the UI, set this to
# a space separated list of the Python exception type names to
# ignore. The exception name should be of the form 'module:class'.
error_collector.ignore_errors =
error_collector.ignore_errors = celery.exceptions:Retry celery.exceptions:RetryTaskError
# Browser monitoring is the Real User Monitoring feature of the UI.
# For those Python web frameworks that are supported, this

Просмотреть файл

@ -109,17 +109,6 @@ pbr==2.0.0 \
--hash=sha256:0ccd2db529afd070df815b1521f01401d43de03941170f8a800e7531faba265d
redis==2.10.5 \
--hash=sha256:97156b37d7cda4e7d8658be1148c983984e1a975090ba458cc7e244025191dbd
click==6.7 \
--hash=sha256:29f99fc6125fbc931b758dc053b3114e55c77a6e4c6c3a2674a2dc986016381d \
--hash=sha256:f15516df478d5a56180fbf80e68f206010e6d160fc39fa508b65e035fd75130b
rq==0.7.1 \
--hash=sha256:6c36f146914f6a28009dbcee0c4610600afd1f4f493ce3a2afbd467076cddaac \
--hash=sha256:b0e98fcfe980cbc7644447d17ea2c177fcbd5c04f1f92d5136c47f00ed2d583d
django-rq==0.9.5 \
--hash=sha256:74e5ba53d3a2ade0e652a1e2f35d7a6f3b15022433c24fbcb977198e735ed9e7 \
--hash=sha256:33f9cb5d26b6cb3d2d7f8b25494b4985e09227cb80058627443bdb6bdb99e1db
croniter==0.3.15 \
--hash=sha256:ac0c9811ebdecd27bc29eb0711e2d87c8c280a53d0e92c48b600654d23b1d541
django-allauth==0.31.0 \
--hash=sha256:edfff4599cd5431e4af380ddb9fc66e7b923157dcac67cceb5604b3c58003907
python3-openid==3.1.0 \
@ -330,11 +319,6 @@ msgpack-python==0.4.8 \
defusedxml==0.5.0 \
--hash=sha256:702a91ade2968a82beb0db1e0766a6a273f33d4616a6ce8cde475d8e09853b20 \
--hash=sha256:24d7f2f94f7f3cb6061acb215685e5125fbcdc40a857eff9de22518820b0a4f4
https://github.com/ui/rq-scheduler/archive/5dd071328ee3eeaad67ecf7cb2550ef8fa08a787.zip#egg=rq-scheduler \
--hash=sha256:837c74ca0c42632a796bcf954dcde5515c17168c4a27d169f49cbc6ed880df4e
rq-retry==0.3.0 \
--hash=sha256:a0ecb39b02c9df3bac9ef88cda2d93cefb9120e3d8ccef920b38a851330e917f \
--hash=sha256:c8cafedf3b11b931f422b12b304c9815cc84c3d145cd05aff337a0b973d0c000
asn1crypto==0.22.0 \
--hash=sha256:d232509fefcfcdb9a331f37e9c9dc20441019ad927c7d2176cf18ed5da0ba097 \
--hash=sha256:cbbadd640d3165ab24b06ef25d1dca09a3441611ac15f6a6b452474fdf0aed1a
@ -357,3 +341,29 @@ future==0.16.0 \
pytest-flake8==0.8.1 \
--hash=sha256:aa10a6db147485d71dad391d4149388904c3072194d51755f64784ff128845fd \
--hash=sha256:8efaf4595a13079197ac740a12e6a87e3403f08133a42d3ac5984474f6f91681
celery==4.0.2 \
--hash=sha256:0e5b7e0d7f03aa02061abfd27aa9da05b6740281ca1f5228a54fbf7fe74d8afa \
--hash=sha256:e3d5a6c56a73ff8f2ddd4d06dc37f4c2afe4bb4da7928b884d0725ea865ef54d
billiard==3.5.0.2 \
--hash=sha256:82478ebdd3cd4d357613cb4735eb828014364c5e02d3bba96ea6c1565a2c0172 \
--hash=sha256:a93c90d59fca62ad63f92b3d2bf1d752c154dde90a3100dba4c8e439386e534c \
--hash=sha256:d8df4b276b11b3e2fe25652e411487bda6e5bac4f8fd236a278a2bfe300f7c43 \
--hash=sha256:e740e352bbf7b6c8cc92a2596cc2da2bfb4ab1009aeb68bf844456af4e924278 \
--hash=sha256:52c2e01c95c6edae9ca1f48d83503e6ceeafbf28c19929a1e917fe951a83adb1 \
--hash=sha256:03f669755f1d6b7dbe528fe615a3164b0d8efca095382c514dd7026dfb79a9c6 \
--hash=sha256:3eb01a8fe44116aa6d63d2010515ef1526e40caee5f766f75b2d28393332dcaa
kombu==4.0.2 \
--hash=sha256:385bf38e6de7f3851f674671dbfe24572ce999608d293a85fb8a630654d8bd9c \
--hash=sha256:d0fc6f2a36610a308f838db4b832dad79a681b516ac1d1a1f9d42edb58cc11a2
amqp==2.1.4 \
--hash=sha256:5e0871a93433f941e444c2b859da095f05034d2ac1b7c084529cfd0b6f8eef18 \
--hash=sha256:1378cc14afeb6c2850404f322d03dec0082d11d04bdcb0360e1b10d4e6e77ef9
vine==1.1.3 \
--hash=sha256:739b19304065de99bd1f4665abe461b449b1022c1e4f89a7925db9d50e9741ea \
--hash=sha256:87b95da19249373430a8fafca36f1aecb7aa0f1cc78545877857afc46aea2441
django_celery_results==1.0.1 \
--hash=sha256:dfa240fb535a1a2d01c9e605ad71629909318eae6b893c5009eafd7265fde10b \
--hash=sha256:8bca2605eeff4418be7ce428a6958d64bee0f5bdf1f8e563fbc09a9e2f3d990f
django_celery_beat==1.0.1 \
--hash=sha256:6d132af1e24c313eb47042da71cf30c5f612f93337a1c85309ad2d490f76bf8b \
--hash=sha256:c8d5233fd0eb3404a800cc62383d241ac6f95b04d3a87b3720d212f0c85654b9

Просмотреть файл

@ -11,7 +11,7 @@ from django.utils import timezone
from freezegun import freeze_time
from atmo.clusters.models import Cluster
from atmo.jobs import jobs, models
from atmo.jobs import models, tasks
@pytest.fixture
@ -638,7 +638,7 @@ def test_send_run_alert_mails(client, mocker, test_user,
mocked_send_email = mocker.patch('atmo.email.send_email')
jobs.send_run_alert_mails()
tasks.send_run_alert_mails()
mocked_send_email.assert_called_once_with(
to=spark_job.created_by.email,