A few adjustments on CeleryExecutor

This commit is contained in:
Maxime 2014-12-30 18:07:01 +00:00
Родитель bf39f12587
Коммит 34bb46fef0
3 изменённых файлов: 3 добавлений и 2 удалений

Просмотреть файл

@ -7,7 +7,7 @@ _EXECUTOR = conf.get('core', 'EXECUTOR')
if _EXECUTOR == 'LocalExecutor':
from airflow.executors.local_executor import LocalExecutor
DEFAULT_EXECUTOR = LocalExecutor()
elif _EXECUTOR == 'LocalExecutor':
elif _EXECUTOR == 'CeleryExecutor':
from airflow.executors.celery_executor import CeleryExecutor
DEFAULT_EXECUTOR = CeleryExecutor()
elif _EXECUTOR == 'SequentialExecutor':

Просмотреть файл

@ -50,7 +50,7 @@ class CeleryExecutor(BaseExecutor):
if self.last_state[key] != async.state:
if async.state == celery_states.SUCCESS:
self.change_state(key, State.SUCCESS)
elif async.state in celery_states.FAILURE_STATES:
elif async.state == celery_states.FAILURE:
self.change_state(key, State.FAILED)
self.last_state[key] = async.state

Просмотреть файл

@ -2,6 +2,7 @@
source $AIRFLOW_HOME/env/bin/activate
export PYTHONPATH=$AIRFLOW_HOME
export AIRFLOW_CONFIG_PATH=$AIRFLOW_HOME/airflow/airflow.cfg
export PATH=$PATH:$AIRFLOW_HOME/airflow/bin
# Create the conf file from template if it doesn't exist
if [ ! -f $AIRFLOW_CONFIG_PATH ]; then