This commit is contained in:
Maxime 2014-12-29 17:01:08 +00:00
Родитель 6934d955ff
Коммит bf39f12587
5 изменённых файлов: 77 добавлений и 94 удалений

Просмотреть файл

@ -6,6 +6,7 @@ DAGS_FOLDER: %(AIRFLOW_HOME)s/dags
BASE_FOLDER: %(AIRFLOW_HOME)s/airflow BASE_FOLDER: %(AIRFLOW_HOME)s/airflow
BASE_URL: http://localhost:8080 BASE_URL: http://localhost:8080
SQL_ALCHEMY_CONN: mysql://airflow:airflow@localhost:3306/airflow SQL_ALCHEMY_CONN: mysql://airflow:airflow@localhost:3306/airflow
EXECUTOR: LocalExecutor
[server] [server]
WEB_SERVER_HOST: 0.0.0.0 WEB_SERVER_HOST: 0.0.0.0
@ -19,9 +20,9 @@ SMTP_PASSWORD: None
SMTP_MAIL_FROM: 'airflow_alerts@mydomain.com' SMTP_MAIL_FROM: 'airflow_alerts@mydomain.com'
[celery] [celery]
CELERY_APP_NAME: airflow.executors.celery_worker CELERY_APP_NAME: airflow.executors.celery_executor
CELERY_BROKER: amqp BROKER_URL = sqla+mysql://airflow:airflow@localhost:3306/airflow
CELERY_RESULTS_BACKEND: amqp:// CELERY_RESULT_BACKEND = db+mysql://airflow:airflow@localhost:3306/airflow
[hooks] [hooks]
HIVE_HOME_PY: '/usr/lib/hive/lib/py' HIVE_HOME_PY: '/usr/lib/hive/lib/py'

Просмотреть файл

@ -151,6 +151,11 @@ def master(args):
job.run() job.run()
def worker(args):
from airflow.executors.celery_executor import app
app.start()
def initdb(args): def initdb(args):
if raw_input( if raw_input(
@ -160,7 +165,6 @@ def initdb(args):
format=settings.SIMPLE_LOG_FORMAT) format=settings.SIMPLE_LOG_FORMAT)
from airflow import models from airflow import models
from airflow import jobs
logging.info("Dropping tables that exist") logging.info("Dropping tables that exist")
models.Base.metadata.drop_all(settings.engine) models.Base.metadata.drop_all(settings.engine)
@ -199,7 +203,6 @@ def initdb(args):
if __name__ == '__main__': if __name__ == '__main__':
reload(logging)
logging.root.handlers = [] logging.root.handlers = []
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@ -306,5 +309,9 @@ if __name__ == '__main__':
parser_initdb = subparsers.add_parser('initdb', help=ht) parser_initdb = subparsers.add_parser('initdb', help=ht)
parser_initdb.set_defaults(func=initdb) parser_initdb.set_defaults(func=initdb)
ht = "Start a Celery worker node"
parser_worker = subparsers.add_parser('worker', help=ht)
parser_worker.set_defaults(func=worker)
args = parser.parse_args() args = parser.parse_args()
args.func(args) args.func(args)

Просмотреть файл

@ -1,7 +1,19 @@
#from airflow.executors.celery_executor import CeleryExecutor import logging
from airflow.executors.local_executor import LocalExecutor
#from airflow.executors.sequential_executor import SequentialExecutor
# DEFAULT_EXECUTOR = CeleryExecutor() from airflow.settings import conf
_EXECUTOR = conf.get('core', 'EXECUTOR')
if _EXECUTOR == 'LocalExecutor':
from airflow.executors.local_executor import LocalExecutor
DEFAULT_EXECUTOR = LocalExecutor() DEFAULT_EXECUTOR = LocalExecutor()
#DEFAULT_EXECUTOR = SequentialExecutor() elif _EXECUTOR == 'LocalExecutor':
from airflow.executors.celery_executor import CeleryExecutor
DEFAULT_EXECUTOR = CeleryExecutor()
elif _EXECUTOR == 'SequentialExecutor':
from airflow.executors.sequential_executor import SequentialExecutor
DEFAULT_EXECUTOR = SequentialExecutor()
else:
raise Exception("Executor {0} not supported.".format(_EXECUTOR))
logging.info("Using executor " + _EXECUTOR)

Просмотреть файл

@ -1,76 +1,63 @@
import logging import logging
import multiprocessing
import time import time
from airflow.executors.base_executor import BaseExecutor from airflow.executors.base_executor import BaseExecutor
from airflow.configuration import conf from airflow.configuration import conf
from airflow.utils import State from airflow.utils import State
from celery_worker import execute_command
import subprocess
from celery import Celery
from celery import states as celery_states
'''
To start the celery worker, run the command:
airflow worker
'''
class CeleryConfig(object):
BROKER_URL = conf.get('celery', 'BROKER_URL')
CELERY_RESULT_BACKEND = conf.get('celery', 'CELERY_RESULT_BACKEND')
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
app = Celery(
conf.get('celery', 'CELERY_APP_NAME'),
config_source=CeleryConfig)
@app.task
def execute_command(command):
logging.info("Executing command in Celery " + command)
rc = subprocess.Popen(command, shell=True).wait()
if rc:
logging.error(rc)
raise Exception('Celery command failed')
class CeleryExecutor(BaseExecutor): class CeleryExecutor(BaseExecutor):
""" Submits the task to RabbitMQ, which is picked up and executed by a bunch
of worker processes """
def __init__(self, parallelism=1):
super(CeleryExecutor, self).__init__()
self.parallelism = parallelism
def start(self): def start(self):
self.queue = multiprocessing.JoinableQueue() self.tasks = {}
self.result_queue = multiprocessing.Queue() self.last_state = {}
self.workers = [
CelerySubmitter(self.queue, self.result_queue)
for i in xrange(self.parallelism)]
for w in self.workers:
w.start()
def execute_async(self, key, command): def execute_async(self, key, command):
self.queue.put((key, command)) self.tasks[key] = execute_command.delay(command)
self.last_state[key] = celery_states.PENDING
def heartbeat(self): def heartbeat(self):
while not self.result_queue.empty(): for key, async in self.tasks.items():
results = self.result_queue.get() if self.last_state[key] != async.state:
self.change_state(*results) if async.state == celery_states.SUCCESS:
self.change_state(key, State.SUCCESS)
elif async.state in celery_states.FAILURE_STATES:
self.change_state(key, State.FAILED)
self.last_state[key] = async.state
def end(self): def end(self):
# Sending poison pill to all worker print('entering end')
[self.queue.put(None) for w in self.workers] while any([
self.queue.join() async.state not in celery_states.READY_STATES
for async in self.tasks.values()]):
print str([async.state for async in self.tasks.values()])
class CelerySubmitter(multiprocessing.Process): time.sleep(5)
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
while True:
work = self.task_queue.get()
if work is None:
# Received poison pill, no more tasks to run
self.task_queue.task_done()
break
key, command = work
BASE_FOLDER = conf.get('core', 'BASE_FOLDER')
command = (
"exec bash -c '"
"cd $AIRFLOW_HOME;\n" +
"source init.sh;\n" +
command +
"'"
).format(**locals())
try:
res = execute_command.delay(command)
result = res.get()
except Exception as e:
self.result_queue.put((key, State.FAILED))
logging.exception(e)
raise e
self.result_queue.put((key, State.SUCCESS))
self.task_queue.task_done()
time.sleep(1)

Просмотреть файл

@ -1,24 +0,0 @@
import subprocess
import logging
from airflow.configuration import conf
from celery import Celery
# to start the celery worker, run the command:
# "celery -A airflow.executors.celery_worker worker --loglevel=info"
# app = Celery('airflow.executors.celery_worker', backend='amqp', broker='amqp://')
app = Celery(
conf.get('celery', 'CELERY_APP_NAME'),
backend=conf.get('celery', 'CELERY_BROKER'),
broker=conf.get('celery', 'CELERY_RESULTS_BACKEND'))
@app.task(name='airflow.executors.celery_worker.execute_command')
def execute_command(command):
logging.info("Executing command in Celery " + command)
try:
subprocess.Popen(command, shell=True).wait()
except Exception as e:
raise e
return True