From 091a489532304e2a9887412debed72e18afc8274 Mon Sep 17 00:00:00 2001 From: Maxime Beauchemin Date: Thu, 14 May 2015 20:17:50 -0700 Subject: [PATCH] Enabling queue management for Celery --- airflow/bin/cli.py | 5 +++++ airflow/configuration.py | 3 +++ airflow/executors/base_executor.py | 13 +++++++------ airflow/executors/celery_executor.py | 11 +++++++---- airflow/executors/local_executor.py | 2 +- airflow/executors/sequential_executor.py | 2 +- airflow/models.py | 5 +++++ 7 files changed, 29 insertions(+), 12 deletions(-) diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 5d3f881cbe..1515c57571 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -286,6 +286,7 @@ def worker(args): options = { 'optimization': 'fair', 'O': 'fair', + 'Q': args.queues, } worker.run(**options) sp.kill() @@ -494,6 +495,10 @@ def get_parser(): ht = "Start a Celery worker node" parser_worker = subparsers.add_parser('worker', help=ht) + parser_worker.add_argument( + "-q", "--queues", + help="Coma delimeted list of queues to cater serve", + default=conf.get('celery', 'celery_default_queue')) parser_worker.set_defaults(func=worker) ht = "Serve logs generate by worker" diff --git a/airflow/configuration.py b/airflow/configuration.py index c546b5318d..82df2b81bc 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -23,6 +23,9 @@ defaults = { 'scheduler_heartbeat_sec': 60, 'authenticate': False, }, + 'celery': { + 'celery_default_queue': 'default', + }, } DEFAULT_CONFIG = """\ diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 9321679ad4..1bb69bd7e7 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -29,10 +29,10 @@ class BaseExecutor(object): """ pass - def queue_command(self, key, command, priority=1): + def queue_command(self, key, command, priority=1, queue=None): if key not in self.queued_tasks and key not in self.running: logging.info("Adding to queue: " + command) - self.queued_tasks[key] = (command, priority) + self.queued_tasks[key] = (command, priority, queue) def queue_task_instance( self, task_instance, mark_success=False, pickle_id=None, @@ -46,7 +46,8 @@ class BaseExecutor(object): self.queue_command( task_instance.key, command, - priority=task_instance.task.priority_weight_total) + priority=task_instance.task.priority_weight_total, + queue=task_instance.queue) def sync(self): """ @@ -75,10 +76,10 @@ class BaseExecutor(object): key=lambda x: x[1][1], reverse=True) for i in range(min((open_slots, len(self.queued_tasks)))): - key, (command, priority) = sorted_queue.pop(0) + key, (command, priority, queue) = sorted_queue.pop(0) self.running[key] = command del self.queued_tasks[key] - self.execute_async(key, command) + self.execute_async(key, command=command, queue=queue) def change_state(self, key, state): del self.running[key] @@ -98,7 +99,7 @@ class BaseExecutor(object): self.event_buffer = {} return d - def execute_async(self, key, command): # pragma: no cover + def execute_async(self, key, command, queue=None): # pragma: no cover """ This method will execute the command asynchronously. """ diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index e1480cb698..1a694469ed 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -15,6 +15,8 @@ To start the celery worker, run the command: airflow worker ''' +CELERY_DEFAULT_QUEUE = conf.getint('celery', 'CELERY_DEFAULT_QUEUE') + class CeleryConfig(object): CELERY_ACCEPT_CONTENT = ['json', 'pickle'] @@ -23,6 +25,7 @@ class CeleryConfig(object): BROKER_URL = conf.get('celery', 'BROKER_URL') CELERY_RESULT_BACKEND = conf.get('celery', 'CELERY_RESULT_BACKEND') CELERYD_CONCURRENCY = conf.getint('celery', 'CELERYD_CONCURRENCY') + CELERY_DEFAULT_QUEUE = CELERY_DEFAULT_QUEUE app = Celery( conf.get('celery', 'CELERY_APP_NAME'), @@ -39,21 +42,21 @@ def execute_command(command): class CeleryExecutor(BaseExecutor): - ''' + """ CeleryExecutor is recommended for production use of Airflow. It allows distributing the execution of task instances to multiple worker nodes. Celery is a simple, flexible and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. - ''' + """ def start(self): self.tasks = {} self.last_state = {} - def execute_async(self, key, command): - self.tasks[key] = execute_command.delay(command) + def execute_async(self, key, command, queue=CELERY_DEFAULT_QUEUE): + self.tasks[key] = execute_command.delay(command, queue=queue) self.last_state[key] = celery_states.PENDING def sync(self): diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index becf36feb7..54519d372c 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -56,7 +56,7 @@ class LocalExecutor(BaseExecutor): for w in self.workers: w.start() - def execute_async(self, key, command): + def execute_async(self, key, command, queue=None): self.queue.put((key, command)) def sync(self): diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index c792d74a67..d1c0839d9f 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -18,7 +18,7 @@ class SequentialExecutor(BaseExecutor): super(SequentialExecutor, self).__init__() self.commands_to_run = [] - def execute_async(self, key, command): + def execute_async(self, key, command, queue=None): self.commands_to_run.append((key, command,)) def sync(self): diff --git a/airflow/models.py b/airflow/models.py index 8000c02bb7..6bcd60772b 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -898,6 +898,10 @@ class BaseOperator(Base): This is useful if the different instances of a task X alter the same asset, and this asset is used by the dependencies of task X. :type wait_for_downstream: bool + :param queue: which queue to target when running this job. Not + all executors implement queue management, the CeleryExecutor + does support targeting specific queues. + :type queue: str :param dag: a reference to the dag the task is attached to (if any) :type dag: DAG :param priority_weight: priority weight of this task against other task. @@ -949,6 +953,7 @@ class BaseOperator(Base): default_args=None, adhoc=False, priority_weight=1, + queue=None, *args, **kwargs):