Enabling queue management for Celery
This commit is contained in:
Родитель
89df0b34e4
Коммит
091a489532
|
@ -286,6 +286,7 @@ def worker(args):
|
|||
options = {
|
||||
'optimization': 'fair',
|
||||
'O': 'fair',
|
||||
'Q': args.queues,
|
||||
}
|
||||
worker.run(**options)
|
||||
sp.kill()
|
||||
|
@ -494,6 +495,10 @@ def get_parser():
|
|||
|
||||
ht = "Start a Celery worker node"
|
||||
parser_worker = subparsers.add_parser('worker', help=ht)
|
||||
parser_worker.add_argument(
|
||||
"-q", "--queues",
|
||||
help="Coma delimeted list of queues to cater serve",
|
||||
default=conf.get('celery', 'celery_default_queue'))
|
||||
parser_worker.set_defaults(func=worker)
|
||||
|
||||
ht = "Serve logs generate by worker"
|
||||
|
|
|
@ -23,6 +23,9 @@ defaults = {
|
|||
'scheduler_heartbeat_sec': 60,
|
||||
'authenticate': False,
|
||||
},
|
||||
'celery': {
|
||||
'celery_default_queue': 'default',
|
||||
},
|
||||
}
|
||||
|
||||
DEFAULT_CONFIG = """\
|
||||
|
|
|
@ -29,10 +29,10 @@ class BaseExecutor(object):
|
|||
"""
|
||||
pass
|
||||
|
||||
def queue_command(self, key, command, priority=1):
|
||||
def queue_command(self, key, command, priority=1, queue=None):
|
||||
if key not in self.queued_tasks and key not in self.running:
|
||||
logging.info("Adding to queue: " + command)
|
||||
self.queued_tasks[key] = (command, priority)
|
||||
self.queued_tasks[key] = (command, priority, queue)
|
||||
|
||||
def queue_task_instance(
|
||||
self, task_instance, mark_success=False, pickle_id=None,
|
||||
|
@ -46,7 +46,8 @@ class BaseExecutor(object):
|
|||
self.queue_command(
|
||||
task_instance.key,
|
||||
command,
|
||||
priority=task_instance.task.priority_weight_total)
|
||||
priority=task_instance.task.priority_weight_total,
|
||||
queue=task_instance.queue)
|
||||
|
||||
def sync(self):
|
||||
"""
|
||||
|
@ -75,10 +76,10 @@ class BaseExecutor(object):
|
|||
key=lambda x: x[1][1],
|
||||
reverse=True)
|
||||
for i in range(min((open_slots, len(self.queued_tasks)))):
|
||||
key, (command, priority) = sorted_queue.pop(0)
|
||||
key, (command, priority, queue) = sorted_queue.pop(0)
|
||||
self.running[key] = command
|
||||
del self.queued_tasks[key]
|
||||
self.execute_async(key, command)
|
||||
self.execute_async(key, command=command, queue=queue)
|
||||
|
||||
def change_state(self, key, state):
|
||||
del self.running[key]
|
||||
|
@ -98,7 +99,7 @@ class BaseExecutor(object):
|
|||
self.event_buffer = {}
|
||||
return d
|
||||
|
||||
def execute_async(self, key, command): # pragma: no cover
|
||||
def execute_async(self, key, command, queue=None): # pragma: no cover
|
||||
"""
|
||||
This method will execute the command asynchronously.
|
||||
"""
|
||||
|
|
|
@ -15,6 +15,8 @@ To start the celery worker, run the command:
|
|||
airflow worker
|
||||
'''
|
||||
|
||||
CELERY_DEFAULT_QUEUE = conf.getint('celery', 'CELERY_DEFAULT_QUEUE')
|
||||
|
||||
|
||||
class CeleryConfig(object):
|
||||
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
|
||||
|
@ -23,6 +25,7 @@ class CeleryConfig(object):
|
|||
BROKER_URL = conf.get('celery', 'BROKER_URL')
|
||||
CELERY_RESULT_BACKEND = conf.get('celery', 'CELERY_RESULT_BACKEND')
|
||||
CELERYD_CONCURRENCY = conf.getint('celery', 'CELERYD_CONCURRENCY')
|
||||
CELERY_DEFAULT_QUEUE = CELERY_DEFAULT_QUEUE
|
||||
|
||||
app = Celery(
|
||||
conf.get('celery', 'CELERY_APP_NAME'),
|
||||
|
@ -39,21 +42,21 @@ def execute_command(command):
|
|||
|
||||
|
||||
class CeleryExecutor(BaseExecutor):
|
||||
'''
|
||||
"""
|
||||
CeleryExecutor is recommended for production use of Airflow. It allows
|
||||
distributing the execution of task instances to multiple worker nodes.
|
||||
|
||||
Celery is a simple, flexible and reliable distributed system to process
|
||||
vast amounts of messages, while providing operations with the tools
|
||||
required to maintain such a system.
|
||||
'''
|
||||
"""
|
||||
|
||||
def start(self):
|
||||
self.tasks = {}
|
||||
self.last_state = {}
|
||||
|
||||
def execute_async(self, key, command):
|
||||
self.tasks[key] = execute_command.delay(command)
|
||||
def execute_async(self, key, command, queue=CELERY_DEFAULT_QUEUE):
|
||||
self.tasks[key] = execute_command.delay(command, queue=queue)
|
||||
self.last_state[key] = celery_states.PENDING
|
||||
|
||||
def sync(self):
|
||||
|
|
|
@ -56,7 +56,7 @@ class LocalExecutor(BaseExecutor):
|
|||
for w in self.workers:
|
||||
w.start()
|
||||
|
||||
def execute_async(self, key, command):
|
||||
def execute_async(self, key, command, queue=None):
|
||||
self.queue.put((key, command))
|
||||
|
||||
def sync(self):
|
||||
|
|
|
@ -18,7 +18,7 @@ class SequentialExecutor(BaseExecutor):
|
|||
super(SequentialExecutor, self).__init__()
|
||||
self.commands_to_run = []
|
||||
|
||||
def execute_async(self, key, command):
|
||||
def execute_async(self, key, command, queue=None):
|
||||
self.commands_to_run.append((key, command,))
|
||||
|
||||
def sync(self):
|
||||
|
|
|
@ -898,6 +898,10 @@ class BaseOperator(Base):
|
|||
This is useful if the different instances of a task X alter
|
||||
the same asset, and this asset is used by the dependencies of task X.
|
||||
:type wait_for_downstream: bool
|
||||
:param queue: which queue to target when running this job. Not
|
||||
all executors implement queue management, the CeleryExecutor
|
||||
does support targeting specific queues.
|
||||
:type queue: str
|
||||
:param dag: a reference to the dag the task is attached to (if any)
|
||||
:type dag: DAG
|
||||
:param priority_weight: priority weight of this task against other task.
|
||||
|
@ -949,6 +953,7 @@ class BaseOperator(Base):
|
|||
default_args=None,
|
||||
adhoc=False,
|
||||
priority_weight=1,
|
||||
queue=None,
|
||||
*args,
|
||||
**kwargs):
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче