This commit is contained in:
Maxime 2015-05-23 17:59:37 +00:00
Родитель 5a2f9dab24
Коммит 6789a42077
3 изменённых файлов: 7 добавлений и 5 удалений

Просмотреть файл

@ -498,7 +498,7 @@ def get_parser():
parser_worker.add_argument(
"-q", "--queues",
help="Coma delimeted list of queues to cater serve",
default=conf.get('celery', 'celery_default_queue'))
default=conf.get('celery', 'DEFAULT_QUEUE'))
parser_worker.set_defaults(func=worker)
ht = "Serve logs generate by worker"

Просмотреть файл

@ -24,7 +24,7 @@ defaults = {
'authenticate': False,
},
'celery': {
'celery_default_queue': 'default',
'default_queue': 'default',
},
}
@ -57,6 +57,7 @@ worker_log_server_port = 8793
broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
flower_port = 8383
default_queue = default
[scheduler]
job_heartbeat_sec = 5
@ -92,6 +93,7 @@ worker_log_server_port = 8793
broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
flower_port = 5555
default_queue = default
[scheduler]
job_heartbeat_sec = 1

Просмотреть файл

@ -15,7 +15,7 @@ To start the celery worker, run the command:
airflow worker
'''
CELERY_DEFAULT_QUEUE = conf.get('celery', 'CELERY_DEFAULT_QUEUE')
DEFAULT_QUEUE = conf.get('celery', 'DEFAULT_QUEUE')
class CeleryConfig(object):
@ -25,7 +25,7 @@ class CeleryConfig(object):
BROKER_URL = conf.get('celery', 'BROKER_URL')
CELERY_RESULT_BACKEND = conf.get('celery', 'CELERY_RESULT_BACKEND')
CELERYD_CONCURRENCY = conf.getint('celery', 'CELERYD_CONCURRENCY')
CELERY_DEFAULT_QUEUE = CELERY_DEFAULT_QUEUE
CELERY_DEFAULT_QUEUE = DEFAULT_QUEUE
app = Celery(
conf.get('celery', 'CELERY_APP_NAME'),
@ -55,7 +55,7 @@ class CeleryExecutor(BaseExecutor):
self.tasks = {}
self.last_state = {}
def execute_async(self, key, command, queue=CELERY_DEFAULT_QUEUE):
def execute_async(self, key, command, queue=DEFAULT_QUEUE):
self.tasks[key] = execute_command.apply_async(
args=[command], queue=queue)
self.last_state[key] = celery_states.PENDING