Attempt at removing the recursive nature of chains

This commit is contained in:
Ask Solem 2014-05-07 16:33:19 +01:00
Родитель 61288aa2a8
Коммит 799013c553
2 изменённых файлов: 23 добавлений и 24 удалений

Просмотреть файл

@ -269,7 +269,8 @@ class AMQP(object):
expires=None, retries=0, chord=None,
callbacks=None, errbacks=None, reply_to=None,
time_limit=None, soft_time_limit=None,
create_sent_event=False, now=None, timezone=None):
create_sent_event=False, now=None, chain=None,
timezone=None):
args = args or ()
kwargs = kwargs or {}
utc = self.utc
@ -300,7 +301,6 @@ class AMQP(object):
'expires': expires,
'callbacks': callbacks,
'errbacks': errbacks,
'chain': None, # TODO
'group': group_id,
'chord': chord,
'retries': retries,
@ -310,7 +310,7 @@ class AMQP(object):
'correlation_id': task_id,
'reply_to': reply_to or '',
},
body=(args, kwargs),
body=(args, kwargs, chain),
sent_event={
'uuid': task_id,
'name': name,

Просмотреть файл

@ -217,11 +217,10 @@ def add_chain_task(app):
accept_magic_kwargs = False
_decorated = True
def prepare_steps(self, args, tasks):
def prepare_steps(self, args, tasks, link_error=None):
app = self.app
steps = deque(tasks)
next_step = prev_task = prev_res = None
tasks, results = [], []
i = 0
while steps:
# First task get partial args from chain.
@ -249,42 +248,42 @@ def add_chain_task(app):
except IndexError:
pass # no callback, so keep as group
if prev_task:
# link previous task to this task.
prev_task.link(task)
# set the results parent attribute.
if not res.parent:
res.parent = prev_res
if not isinstance(prev_task, chord):
results.append(res)
tasks.append(task)
prev_task, prev_res = task, res
if link_error:
task.set(link_error=link_error)
return tasks, results
if not isinstance(prev_task, chord):
yield task, res
prev_task, prev_res = task, res
def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
task_id=None, link=None, link_error=None, **options):
if self.app.conf.CELERY_ALWAYS_EAGER:
return self.apply(args, kwargs, **options)
options.pop('publisher', None)
tasks, results = self.prepare_steps(args, kwargs['tasks'])
result = results[-1]
steps = list(self.prepare_steps(
args, kwargs['tasks'], link_error,
))
first_task = steps.pop(0)[0]
last_task, last_result = steps[-1]
if group_id:
tasks[-1].set(group_id=group_id)
last_task.set(group_id=group_id)
if chord:
tasks[-1].set(chord=chord)
last_task.set(chord=chord)
if task_id:
tasks[-1].set(task_id=task_id)
result = tasks[-1].type.AsyncResult(task_id)
last_task.set(task_id=task_id)
last_result = last_task.type.AsyncResult(task_id)
# make sure we can do a link() and link_error() on a chain object.
if link:
tasks[-1].set(link=link)
last_task.set(link=link)
# and if any task in the chain fails, call the errbacks
if link_error:
for task in tasks:
task.set(link_error=link_error)
tasks[0].apply_async(**options)
return result
first_task.set(chain=steps)
first_task.apply_async(**options)
return last_result
def apply(self, args=(), kwargs={}, signature=maybe_signature,
**options):