From 2536f06e2e0a0f883c71a603cf36487599813d43 Mon Sep 17 00:00:00 2001 From: Maxime Date: Fri, 9 Jan 2015 20:04:45 +0000 Subject: [PATCH] Making concurency a conf param, stting CELERYD_PREFETCH_MULTIPLIER=1 --- airflow/airflow.cfg.template | 1 + airflow/executors/celery_executor.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/airflow/airflow.cfg.template b/airflow/airflow.cfg.template index 106bc4bf32..a95094fedb 100644 --- a/airflow/airflow.cfg.template +++ b/airflow/airflow.cfg.template @@ -23,6 +23,7 @@ SMTP_MAIL_FROM: 'airflow_alerts@mydomain.com' CELERY_APP_NAME: airflow.executors.celery_executor BROKER_URL = sqla+mysql://airflow:airflow@localhost:3306/airflow CELERY_RESULT_BACKEND = db+mysql://airflow:airflow@localhost:3306/airflow +CELERYD_CONCURRENCY = 16 WORKER_LOG_SERVER_PORT = 8793 [hooks] diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 87203cfbf2..4c0d44f10b 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -20,6 +20,8 @@ class CeleryConfig(object): BROKER_URL = conf.get('celery', 'BROKER_URL') CELERY_RESULT_BACKEND = conf.get('celery', 'CELERY_RESULT_BACKEND') CELERY_ACCEPT_CONTENT = ['json', 'pickle'] + CELERYD_PREFETCH_MULTIPLIER = 1 + CELERYD_CONCURRENCY = int(conf.get('celery', 'CELERYD_CONCURRENCY')) app = Celery( conf.get('celery', 'CELERY_APP_NAME'),