This commit is contained in:
Maxime 2015-05-21 15:11:15 +00:00
Родитель e59e7ba43c
Коммит 5a2f9dab24
4 изменённых файлов: 5 добавлений и 3 удалений

Просмотреть файл

@ -286,7 +286,7 @@ def worker(args):
options = {
'optimization': 'fair',
'O': 'fair',
'Q': args.queues,
'queues': args.queues,
}
worker.run(**options)
sp.kill()

Просмотреть файл

@ -47,7 +47,7 @@ class BaseExecutor(object):
task_instance.key,
command,
priority=task_instance.task.priority_weight_total,
queue=task_instance.queue)
queue=task_instance.task.queue)
def sync(self):
"""

Просмотреть файл

@ -56,7 +56,8 @@ class CeleryExecutor(BaseExecutor):
self.last_state = {}
def execute_async(self, key, command, queue=CELERY_DEFAULT_QUEUE):
self.tasks[key] = execute_command.delay(command, queue=queue)
self.tasks[key] = execute_command.apply_async(
args=[command], queue=queue)
self.last_state[key] = celery_states.PENDING
def sync(self):

Просмотреть файл

@ -970,6 +970,7 @@ class BaseOperator(Base):
self.wait_for_downstream = wait_for_downstream
self._schedule_interval = schedule_interval
self.retries = retries
self.queue = queue
if isinstance(retry_delay, timedelta):
self.retry_delay = retry_delay
else: