[AIRFLOW-966] Make celery broker_transport_options configurable

Required for changing visibility timeout and other
options required
for Redis/SQS.

Closes #2842 from bolkedebruin/AIRFLOW-966
This commit is contained in:
Bolke de Bruin 2017-12-05 10:13:05 +01:00 коммит произвёл Fokko Driesprong
Родитель 97383f76d0
Коммит aa737a582c
4 изменённых файлов: 27 добавлений и 1 удалений

Просмотреть файл

@ -310,6 +310,13 @@ default_queue = default
# Import path for celery configuration options
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
# The visibility timeout defines the number of seconds to wait for the worker
# to acknowledge the task before the message is redelivered to another worker.
# Make sure to increase the visibility timeout to match the time of the longest
# ETA youre planning to use. Especially important in case of using Redis or SQS
visibility_timeout = 21600
# This section only applies if you are using the DaskExecutor in
# [core] section above

Просмотреть файл

@ -19,6 +19,10 @@ from airflow import configuration
from airflow.utils.log.logging_mixin import LoggingMixin
broker_transport_options = configuration.getsection('celery_broker_transport_options')
if broker_transport_options is None:
broker_transport_options = {'visibility_timeout': 21600}
'accept_content': ['json', 'pickle'],
'event_serializer': 'json',
@ -28,7 +32,7 @@ DEFAULT_CELERY_CONFIG = {
'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'),
'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'),
'broker_url': configuration.get('celery', 'BROKER_URL'),
'broker_transport_options': {'visibility_timeout': 21600},
'broker_transport_options': {'visibility_timeout': broker_transport_options},
'result_backend': configuration.get('celery', 'CELERY_RESULT_BACKEND'),
'worker_concurrency': configuration.getint('celery', 'CELERYD_CONCURRENCY'),

Просмотреть файл

@ -236,6 +236,12 @@ class AirflowConfigParser(ConfigParser):
ConfigParser.read(self, filenames)
def getsection(self, section):
if section in self._sections:
return self._sections[section]
return None
def as_dict(self, display_source=False, display_sensitive=False):
Returns the current configuration as an OrderedDict of OrderedDicts.
@ -423,6 +429,10 @@ def getint(section, key):
return conf.getint(section, key)
def getsection(section):
return conf.getsection(section)
def has_option(section, key):
return conf.has_option(section, key)

Просмотреть файл

@ -155,6 +155,11 @@ Note that you can also run "Celery Flower", a web UI built on top of Celery,
to monitor your workers. You can use the shortcut command ``airflow flower``
to start a Flower web server.
Some caveats:
- Make sure to use a database backed result backend
- Make sure to set a visibility timeout in [celery_broker_transport_options] that exceeds the ETA of your longest running task
- Tasks can and consume resources, make sure your worker as enough resources to run `celeryd_concurrency` tasks
Scaling Out with Dask