From bf39f12587550ee15b2ee09706cf6968da6cfcb0 Mon Sep 17 00:00:00 2001 From: Maxime Date: Mon, 29 Dec 2014 17:01:08 +0000 Subject: [PATCH] Simplifying CeleryExecutor --- airflow/airflow.cfg.template | 7 +- airflow/bin/airflow | 11 ++- airflow/executors/__init__.py | 24 ++++-- airflow/executors/celery_executor.py | 105 ++++++++++++--------------- airflow/executors/celery_worker.py | 24 ------ 5 files changed, 77 insertions(+), 94 deletions(-) delete mode 100644 airflow/executors/celery_worker.py diff --git a/airflow/airflow.cfg.template b/airflow/airflow.cfg.template index 78b34377ce..f5c9bfe13e 100644 --- a/airflow/airflow.cfg.template +++ b/airflow/airflow.cfg.template @@ -6,6 +6,7 @@ DAGS_FOLDER: %(AIRFLOW_HOME)s/dags BASE_FOLDER: %(AIRFLOW_HOME)s/airflow BASE_URL: http://localhost:8080 SQL_ALCHEMY_CONN: mysql://airflow:airflow@localhost:3306/airflow +EXECUTOR: LocalExecutor [server] WEB_SERVER_HOST: 0.0.0.0 @@ -19,9 +20,9 @@ SMTP_PASSWORD: None SMTP_MAIL_FROM: 'airflow_alerts@mydomain.com' [celery] -CELERY_APP_NAME: airflow.executors.celery_worker -CELERY_BROKER: amqp -CELERY_RESULTS_BACKEND: amqp:// +CELERY_APP_NAME: airflow.executors.celery_executor +BROKER_URL = sqla+mysql://airflow:airflow@localhost:3306/airflow +CELERY_RESULT_BACKEND = db+mysql://airflow:airflow@localhost:3306/airflow [hooks] HIVE_HOME_PY: '/usr/lib/hive/lib/py' diff --git a/airflow/bin/airflow b/airflow/bin/airflow index 6abbd48671..b1c8307615 100755 --- a/airflow/bin/airflow +++ b/airflow/bin/airflow @@ -151,6 +151,11 @@ def master(args): job.run() +def worker(args): + from airflow.executors.celery_executor import app + app.start() + + def initdb(args): if raw_input( @@ -160,7 +165,6 @@ def initdb(args): format=settings.SIMPLE_LOG_FORMAT) from airflow import models - from airflow import jobs logging.info("Dropping tables that exist") models.Base.metadata.drop_all(settings.engine) @@ -199,7 +203,6 @@ def initdb(args): if __name__ == '__main__': - reload(logging) logging.root.handlers = [] parser = argparse.ArgumentParser() @@ -306,5 +309,9 @@ if __name__ == '__main__': parser_initdb = subparsers.add_parser('initdb', help=ht) parser_initdb.set_defaults(func=initdb) + ht = "Start a Celery worker node" + parser_worker = subparsers.add_parser('worker', help=ht) + parser_worker.set_defaults(func=worker) + args = parser.parse_args() args.func(args) diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py index af64dbac16..78310c578d 100644 --- a/airflow/executors/__init__.py +++ b/airflow/executors/__init__.py @@ -1,7 +1,19 @@ -#from airflow.executors.celery_executor import CeleryExecutor -from airflow.executors.local_executor import LocalExecutor -#from airflow.executors.sequential_executor import SequentialExecutor +import logging -# DEFAULT_EXECUTOR = CeleryExecutor() -DEFAULT_EXECUTOR = LocalExecutor() -#DEFAULT_EXECUTOR = SequentialExecutor() +from airflow.settings import conf + +_EXECUTOR = conf.get('core', 'EXECUTOR') + +if _EXECUTOR == 'LocalExecutor': + from airflow.executors.local_executor import LocalExecutor + DEFAULT_EXECUTOR = LocalExecutor() +elif _EXECUTOR == 'LocalExecutor': + from airflow.executors.celery_executor import CeleryExecutor + DEFAULT_EXECUTOR = CeleryExecutor() +elif _EXECUTOR == 'SequentialExecutor': + from airflow.executors.sequential_executor import SequentialExecutor + DEFAULT_EXECUTOR = SequentialExecutor() +else: + raise Exception("Executor {0} not supported.".format(_EXECUTOR)) + +logging.info("Using executor " + _EXECUTOR) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 596e1f60fa..b1421819ed 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -1,76 +1,63 @@ import logging -import multiprocessing import time from airflow.executors.base_executor import BaseExecutor from airflow.configuration import conf from airflow.utils import State -from celery_worker import execute_command + +import subprocess + +from celery import Celery +from celery import states as celery_states + +''' +To start the celery worker, run the command: +airflow worker +''' + + +class CeleryConfig(object): + BROKER_URL = conf.get('celery', 'BROKER_URL') + CELERY_RESULT_BACKEND = conf.get('celery', 'CELERY_RESULT_BACKEND') + CELERY_ACCEPT_CONTENT = ['json', 'pickle'] + +app = Celery( + conf.get('celery', 'CELERY_APP_NAME'), + config_source=CeleryConfig) + + +@app.task +def execute_command(command): + logging.info("Executing command in Celery " + command) + rc = subprocess.Popen(command, shell=True).wait() + if rc: + logging.error(rc) + raise Exception('Celery command failed') class CeleryExecutor(BaseExecutor): - """ Submits the task to RabbitMQ, which is picked up and executed by a bunch - of worker processes """ - def __init__(self, parallelism=1): - super(CeleryExecutor, self).__init__() - self.parallelism = parallelism def start(self): - self.queue = multiprocessing.JoinableQueue() - self.result_queue = multiprocessing.Queue() - self.workers = [ - CelerySubmitter(self.queue, self.result_queue) - for i in xrange(self.parallelism)] - - for w in self.workers: - w.start() + self.tasks = {} + self.last_state = {} def execute_async(self, key, command): - self.queue.put((key, command)) + self.tasks[key] = execute_command.delay(command) + self.last_state[key] = celery_states.PENDING def heartbeat(self): - while not self.result_queue.empty(): - results = self.result_queue.get() - self.change_state(*results) + for key, async in self.tasks.items(): + if self.last_state[key] != async.state: + if async.state == celery_states.SUCCESS: + self.change_state(key, State.SUCCESS) + elif async.state in celery_states.FAILURE_STATES: + self.change_state(key, State.FAILED) + self.last_state[key] = async.state def end(self): - # Sending poison pill to all worker - [self.queue.put(None) for w in self.workers] - self.queue.join() - - -class CelerySubmitter(multiprocessing.Process): - - def __init__(self, task_queue, result_queue): - multiprocessing.Process.__init__(self) - self.task_queue = task_queue - self.result_queue = result_queue - - def run(self): - while True: - work = self.task_queue.get() - if work is None: - # Received poison pill, no more tasks to run - self.task_queue.task_done() - break - key, command = work - BASE_FOLDER = conf.get('core', 'BASE_FOLDER') - command = ( - "exec bash -c '" - "cd $AIRFLOW_HOME;\n" + - "source init.sh;\n" + - command + - "'" - ).format(**locals()) - - try: - res = execute_command.delay(command) - result = res.get() - except Exception as e: - self.result_queue.put((key, State.FAILED)) - logging.exception(e) - raise e - self.result_queue.put((key, State.SUCCESS)) - self.task_queue.task_done() - time.sleep(1) - + print('entering end') + while any([ + async.state not in celery_states.READY_STATES + for async in self.tasks.values()]): + print str([async.state for async in self.tasks.values()]) + time.sleep(5) diff --git a/airflow/executors/celery_worker.py b/airflow/executors/celery_worker.py deleted file mode 100644 index f1de6a47d8..0000000000 --- a/airflow/executors/celery_worker.py +++ /dev/null @@ -1,24 +0,0 @@ -import subprocess -import logging - -from airflow.configuration import conf -from celery import Celery - -# to start the celery worker, run the command: -# "celery -A airflow.executors.celery_worker worker --loglevel=info" - -# app = Celery('airflow.executors.celery_worker', backend='amqp', broker='amqp://') - -app = Celery( - conf.get('celery', 'CELERY_APP_NAME'), - backend=conf.get('celery', 'CELERY_BROKER'), - broker=conf.get('celery', 'CELERY_RESULTS_BACKEND')) - -@app.task(name='airflow.executors.celery_worker.execute_command') -def execute_command(command): - logging.info("Executing command in Celery " + command) - try: - subprocess.Popen(command, shell=True).wait() - except Exception as e: - raise e - return True