diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index e0c94c11d4..4de8252652 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -15,6 +15,7 @@ from builtins import object import logging import subprocess +import ssl import time from celery import Celery @@ -46,6 +47,18 @@ class CeleryConfig(object): CELERYD_CONCURRENCY = configuration.getint('celery', 'CELERYD_CONCURRENCY') CELERY_DEFAULT_QUEUE = DEFAULT_QUEUE CELERY_DEFAULT_EXCHANGE = DEFAULT_QUEUE + if configuration.get('celery', 'CELERY_SSL_ACTIVE'): + try: + BROKER_USE_SSL = {'keyfile': configuration.get('celery', 'CELERY_SSL_KEY'), + 'certfile': configuration.get('celery', 'CELERY_SSL_CERT'), + 'ca_certs': configuration.get('celery', 'CELERY_SSL_CACERT'), + 'cert_reqs': ssl.CERT_REQUIRED} + except ValueError: + raise AirflowException('ValueError: CELERY_SSL_ACTIVE is True, please ensure CELERY_SSL_KEY, ' + 'CELERY_SSL_CERT and CELERY_SSL_CACERT are set') + except Exception as e: + raise AirflowException('Exception: There was an unknown Celery SSL Error. Please ensure you want to use ' + 'SSL and/or have all necessary certs and key.') app = Celery( configuration.get('celery', 'CELERY_APP_NAME'), diff --git a/docs/security.rst b/docs/security.rst index ada34a27e1..6c0893d0d5 100644 --- a/docs/security.rst +++ b/docs/security.rst @@ -321,6 +321,17 @@ standard port 443, you'll need to configure that too. Be aware that super user p web_server_port = 443 base_url = http://:443 +Enable CeleryExecutor with SSL. Ensure you properly generate client and server +certs and keys. + +.. code-block:: bash + + [celery] + CELERY_SSL_ACTIVE = True + CELERY_SSL_KEY = + CELERY_SSL_CERT = + CELERY_SSL_CACERT = + Impersonation -------------