Use the generic term "signature"

This commit is contained in:
Ask Solem 2013-10-21 14:40:14 +01:00
Родитель 4634cb6eac
Коммит a71e02cda7
16 изменённых файлов: 139 добавлений и 128 удалений

Просмотреть файл

@ -16,8 +16,8 @@ __homepage__ = 'http://celeryproject.org'
__docformat__ = 'restructuredtext'
__all__ = [
'Celery', 'bugreport', 'shared_task', 'task',
'current_app', 'current_task',
'chain', 'chord', 'chunks', 'group', 'subtask',
'current_app', 'current_task', 'maybe_signature',
'chain', 'chord', 'chunks', 'group', 'signature',
'xmap', 'xstarmap', 'uuid', 'version', '__version__',
]
VERSION_BANNER = '{0} ({1})'.format(__version__, SERIES)
@ -50,7 +50,8 @@ if STATICA_HACK: # pragma: no cover
from celery.app.task import Task # noqa
from celery._state import current_app, current_task # noqa
from celery.canvas import ( # noqa
chain, chord, chunks, group, subtask, xmap, xstarmap,
chain, chord, chunks, group,
signature, maybe_signature, xmap, xstarmap, subtask,
)
from celery.utils import uuid # noqa
@ -129,7 +130,8 @@ old_module, new_module = recreate_module( # pragma: no cover
'celery.app.task': ['Task'],
'celery._state': ['current_app', 'current_task'],
'celery.canvas': ['chain', 'chord', 'chunks', 'group',
'subtask', 'xmap', 'xstarmap'],
'signature', 'maybe_signature', 'subtask',
'xmap', 'xstarmap'],
'celery.utils': ['uuid'],
},
direct={'task': 'celery.task'},

Просмотреть файл

@ -62,7 +62,7 @@ def add_unlock_chord_task(app):
It joins chords by creating a task chain polling the header for completion.
"""
from celery.canvas import subtask
from celery.canvas import signature
from celery.exceptions import ChordError
from celery.result import from_serializable
@ -91,7 +91,7 @@ def add_unlock_chord_task(app):
j = deps.join_native if deps.supports_native_join else deps.join
if deps.ready():
callback = subtask(callback)
callback = signature(callback)
try:
ret = j(propagate=propagate)
except Exception as exc:
@ -122,22 +122,22 @@ def add_unlock_chord_task(app):
@shared_task
def add_map_task(app):
from celery.canvas import subtask
from celery.canvas import signature
@app.task(name='celery.map', _force_evaluate=True)
def xmap(task, it):
task = subtask(task).type
task = signature(task).type
return [task(item) for item in it]
return xmap
@shared_task
def add_starmap_task(app):
from celery.canvas import subtask
from celery.canvas import signature
@app.task(name='celery.starmap', _force_evaluate=True)
def xstarmap(task, it):
task = subtask(task).type
task = signature(task).type
return [task(*item) for item in it]
return xstarmap
@ -155,7 +155,7 @@ def add_chunk_task(app):
@shared_task
def add_group_task(app):
_app = app
from celery.canvas import maybe_subtask, subtask
from celery.canvas import maybe_signature, signature
from celery.result import from_serializable
class Group(app.Task):
@ -167,7 +167,7 @@ def add_group_task(app):
app = self.app
result = from_serializable(result, app)
# any partial args are added to all tasks in the group
taskit = (subtask(task).clone(partial_args)
taskit = (signature(task).clone(partial_args)
for i, task in enumerate(tasks))
if self.request.is_eager or app.conf.CELERY_ALWAYS_EAGER:
return app.GroupResult(
@ -188,7 +188,7 @@ def add_group_task(app):
options.setdefault('task_id', uuid()))
def prepare_member(task):
task = maybe_subtask(task)
task = maybe_signature(task)
opts = task.options
opts['group_id'] = group_id
try:
@ -225,7 +225,7 @@ def add_group_task(app):
@shared_task
def add_chain_task(app):
from celery.canvas import Signature, chord, group, maybe_subtask
from celery.canvas import Signature, chord, group, maybe_signature
_app = app
class Chain(app.Task):
@ -240,7 +240,7 @@ def add_chain_task(app):
i = 0
while steps:
# First task get partial args from chain.
task = maybe_subtask(steps.popleft())
task = maybe_signature(steps.popleft())
task = task.clone() if i else task.clone(args)
res = task.freeze()
i += 1
@ -292,10 +292,10 @@ def add_chain_task(app):
tasks[0].apply_async()
return result
def apply(self, args=(), kwargs={}, subtask=maybe_subtask, **options):
def apply(self, args=(), kwargs={}, signature=maybe_signature, **options):
last, fargs = None, args # fargs passed to first task only
for task in kwargs['tasks']:
res = subtask(task).clone(fargs).apply(last and (last.get(), ))
res = signature(task).clone(fargs).apply(last and (last.get(), ))
res.parent, last, fargs = last, res, None
return last
return Chain
@ -304,10 +304,10 @@ def add_chain_task(app):
@shared_task
def add_chord_task(app):
"""Every chord is executed in a dedicated task, so that the chord
can be used as a subtask, and this generates the task
can be used as a signature, and this generates the task
responsible for that."""
from celery import group
from celery.canvas import maybe_subtask
from celery.canvas import maybe_signature
_app = app
default_propagate = app.conf.CELERY_CHORD_PROPAGATES
@ -327,7 +327,7 @@ def add_chord_task(app):
# - convert back to group if serialized
tasks = header.tasks if isinstance(header, group) else header
header = group([maybe_subtask(s).clone() for s in tasks])
header = group([maybe_signature(s).clone() for s in tasks])
# - eager applies the group inline
if eager:
return header.apply(args=partial_args, task_id=group_id)
@ -361,8 +361,8 @@ def add_chord_task(app):
return self.apply(args, kwargs, **options)
header = kwargs.pop('header')
body = kwargs.pop('body')
header, body = (list(maybe_subtask(header)),
maybe_subtask(body))
header, body = (list(maybe_signature(header)),
maybe_signature(body))
# forward certain options to body
if chord is not None:
body.options['chord'] = chord
@ -380,6 +380,6 @@ def add_chord_task(app):
body = kwargs['body']
res = super(Chord, self).apply(args, dict(kwargs, eager=True),
**options)
return maybe_subtask(body).apply(
return maybe_signature(body).apply(
args=(res.get(propagate=propagate).get(), ))
return Chord

Просмотреть файл

@ -15,7 +15,7 @@ from billiard.einfo import ExceptionInfo
from celery import current_app
from celery import states
from celery._state import _task_stack
from celery.canvas import subtask
from celery.canvas import signature
from celery.exceptions import MaxRetriesExceededError, Reject, Retry
from celery.five import class_property, items, with_metaclass
from celery.result import EagerResult
@ -461,9 +461,9 @@ class Task(object):
:func:`kombu.compression.register`. Defaults to
the :setting:`CELERY_MESSAGE_COMPRESSION`
setting.
:keyword link: A single, or a list of subtasks to apply if the
:keyword link: A single, or a list of tasks to apply if the
task exits successfully.
:keyword link_error: A single, or a list of subtasks to apply
:keyword link_error: A single, or a list of tasks to apply
if an error occurs while executing the task.
:keyword producer: :class:~@amqp.TaskProducer` instance to use.
@ -678,11 +678,11 @@ class Task(object):
task_name=self.name, **kwargs)
def subtask(self, args=None, *starargs, **starkwargs):
"""Return :class:`~celery.subtask` object for
"""Return :class:`~celery.signature` object for
this task, wrapping arguments and execution options
for a single task invocation."""
starkwargs.setdefault('app', self.app)
return subtask(self, args, *starargs, **starkwargs)
return signature(self, args, *starargs, **starkwargs)
def s(self, *args, **kwargs):
"""``.s(*a, **k) -> .subtask(a, k)``"""

Просмотреть файл

@ -191,7 +191,7 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
success_receivers = signals.task_success.receivers
from celery import canvas
subtask = canvas.subtask
signature = canvas.maybe_signature # maybe_ does not clone if already
def trace_task(uuid, args, kwargs, request=None):
R = I = None
@ -233,14 +233,14 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
I = Info(FAILURE, exc)
state, retval = I.state, I.retval
R = I.handle_error_state(task, eager=eager)
[subtask(errback).apply_async((uuid, ))
[signature(errback).apply_async((uuid, ))
for errback in task_request.errbacks or []]
except BaseException as exc:
raise
else:
# callback tasks must be applied before the result is
# stored, so that result.children is populated.
[subtask(callback).apply_async((retval, ))
[signature(callback).apply_async((retval, ))
for callback in task_request.callbacks or []]
if publish_result:
store_result(

Просмотреть файл

@ -454,7 +454,7 @@ class KeyValueStoreBackend(BaseBackend):
def on_chord_part_return(self, task, propagate=None):
if not self.implements_incr:
return
from celery import subtask
from celery import signature
from celery.result import GroupResult
app = self.app
if propagate is None:
@ -465,14 +465,14 @@ class KeyValueStoreBackend(BaseBackend):
key = self.get_key_for_chord(gid)
deps = GroupResult.restore(gid, backend=task.backend)
if deps is None:
callback = subtask(task.request.chord)
callback = signature(task.request.chord)
return app._tasks[callback.task].backend.fail_from_current_stack(
callback.id,
exc=ChordError('GroupResult {0} no longer exists'.format(gid))
)
val = self.incr(key)
if val >= len(deps):
callback = subtask(task.request.chord)
callback = signature(task.request.chord)
j = deps.join_native if deps.supports_native_join else deps.join
try:
ret = j(propagate=propagate)

Просмотреть файл

@ -29,7 +29,7 @@ from celery.utils.functional import (
from celery.utils.text import truncate
__all__ = ['Signature', 'chain', 'xmap', 'xstarmap', 'chunks',
'group', 'chord', 'subtask', 'maybe_subtask']
'group', 'chord', 'signature', 'maybe_signature']
class _getitem_property(object):
@ -99,8 +99,8 @@ class Signature(dict):
arguments will be ignored and the values in the dict will be used
instead.
>>> s = subtask('tasks.add', args=(2, 2))
>>> subtask(s)
>>> s = signature('tasks.add', args=(2, 2))
>>> signature(s)
{'task': 'tasks.add', args=(2, 2), kwargs={}, options={}}
"""
@ -469,7 +469,7 @@ class group(Signature):
gid = opts['task_id'] = uuid()
new_tasks, results = [], []
for task in self.tasks:
task = maybe_subtask(task).clone()
task = maybe_signature(task).clone()
results.append(task._freeze())
new_tasks.append(task)
self.tasks = self.kwargs['tasks'] = new_tasks
@ -501,7 +501,7 @@ class chord(Signature):
Signature.__init__(
self, task, args,
dict(kwargs, header=_maybe_group(header),
body=maybe_subtask(body)), **options
body=maybe_signature(body)), **options
)
self.subtask_type = 'chord'
@ -557,15 +557,17 @@ class chord(Signature):
body = _getitem_property('kwargs.body')
def subtask(varies, *args, **kwargs):
def signature(varies, *args, **kwargs):
if not (args or kwargs) and isinstance(varies, dict):
if isinstance(varies, Signature):
return varies.clone()
return Signature.from_dict(varies)
return Signature(varies, *args, **kwargs)
subtask = signature # XXX compat
def maybe_subtask(d):
def maybe_signature(d):
if d is not None and isinstance(d, dict) and not isinstance(d, Signature):
return subtask(d)
return signature(d)
return d
maybe_subtask = maybe_signature # XXX compat

Просмотреть файл

@ -11,9 +11,11 @@ from __future__ import absolute_import
from celery._state import get_current_worker_task
from celery.app import app_or_default
from celery.canvas import subtask, maybe_subtask # noqa
from celery.canvas import maybe_signature # noqa
from celery.utils import uuid, warn_deprecated
from celery.canvas import subtask # noqa
warn_deprecated(
'celery.task.sets and TaskSet', removal='4.0',
alternative="""\
@ -38,7 +40,7 @@ class TaskSet(list):
app = None
def __init__(self, tasks=None, app=None, Publisher=None):
super(TaskSet, self).__init__(maybe_subtask(t) for t in tasks or [])
super(TaskSet, self).__init__(maybe_signature(t) for t in tasks or [])
self.app = app_or_default(app or self.app)
self.Publisher = Publisher or self.app.amqp.TaskProducer
self.total = len(self) # XXX compat

Просмотреть файл

@ -8,7 +8,7 @@ from contextlib import contextmanager
from kombu.utils.encoding import str_to_bytes
from mock import Mock, patch
from celery import subtask
from celery import signature
from celery import states
from celery.backends.cache import CacheBackend, DummyClient
from celery.exceptions import ImproperlyConfigured
@ -75,7 +75,7 @@ class test_CacheBackend(AppCase):
task = Mock()
task.name = 'foobarbaz'
self.app.tasks['foobarbaz'] = task
task.request.chord = subtask(task)
task.request.chord = signature(task)
gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
task.request.group = gid

Просмотреть файл

@ -8,7 +8,7 @@ from pickle import loads, dumps
from kombu.utils import cached_property, uuid
from celery import subtask
from celery import signature
from celery import states
from celery.datastructures import AttributeDict
from celery.exceptions import ImproperlyConfigured
@ -157,7 +157,7 @@ class test_RedisBackend(AppCase):
task = Mock()
task.name = 'foobarbaz'
self.app.tasks['foobarbaz'] = task
task.request.chord = subtask(task)
task.request.chord = signature(task)
task.request.group = 'group_id'
b.on_chord_part_return(task)

Просмотреть файл

@ -2,7 +2,7 @@ from __future__ import absolute_import
import time
from celery import task, subtask
from celery import task, signature
@task()
@ -14,7 +14,7 @@ def add(x, y):
def add_cb(x, y, callback=None):
result = x + y
if callback:
return subtask(callback).apply_async(result)
return signature(callback).apply_async(result)
return result

Просмотреть файл

@ -7,12 +7,12 @@ from celery.canvas import (
chain,
group,
chord,
subtask,
signature,
xmap,
xstarmap,
chunks,
_maybe_group,
maybe_subtask,
maybe_signature,
)
from celery.result import EagerResult
@ -80,14 +80,14 @@ class test_Signature(CanvasCase):
)
def test_link(self):
x = subtask(SIG)
x = signature(SIG)
x.link(SIG)
x.link(SIG)
self.assertIn(SIG, x.options['link'])
self.assertEqual(len(x.options['link']), 1)
def test_link_error(self):
x = subtask(SIG)
x = signature(SIG)
x.link_error(SIG)
x.link_error(SIG)
self.assertIn(SIG, x.options['link_error'])
@ -146,11 +146,11 @@ class test_Signature(CanvasCase):
self.assertEqual(r.id, 'foo')
def test_AsyncResult_when_not_registered(self):
s = subtask('xxx.not.registered', app=self.app)
s = signature('xxx.not.registered', app=self.app)
self.assertTrue(s.AsyncResult)
def test_apply_async_when_not_registered(self):
s = subtask('xxx.not.registered', app=self.app)
s = signature('xxx.not.registered', app=self.app)
self.assertTrue(s._apply_async)
@ -205,8 +205,8 @@ class test_chain(CanvasCase):
def test_reverse(self):
x = self.add.s(2, 2) | self.add.s(2)
self.assertIsInstance(subtask(x), chain)
self.assertIsInstance(subtask(dict(x)), chain)
self.assertIsInstance(signature(x), chain)
self.assertIsInstance(signature(dict(x)), chain)
def test_always_eager(self):
self.app.conf.CELERY_ALWAYS_EAGER = True
@ -253,8 +253,8 @@ class test_group(CanvasCase):
def test_reverse(self):
x = group([self.add.s(2, 2), self.add.s(4, 4)])
self.assertIsInstance(subtask(x), group)
self.assertIsInstance(subtask(dict(x)), group)
self.assertIsInstance(signature(x), group)
self.assertIsInstance(signature(dict(x)), group)
def test_maybe_group_sig(self):
self.assertListEqual(
@ -287,8 +287,8 @@ class test_chord(CanvasCase):
def test_reverse(self):
x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
self.assertIsInstance(subtask(x), chord)
self.assertIsInstance(subtask(dict(x)), chord)
self.assertIsInstance(signature(x), chord)
self.assertIsInstance(signature(dict(x)), chord)
def test_clone_clones_body(self):
x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
@ -318,14 +318,14 @@ class test_chord(CanvasCase):
self.assertIn('without body', repr(x))
class test_maybe_subtask(CanvasCase):
class test_maybe_signature(CanvasCase):
def test_is_None(self):
self.assertIsNone(maybe_subtask(None))
self.assertIsNone(maybe_signature(None))
def test_is_dict(self):
self.assertIsInstance(maybe_subtask(dict(self.add.s())), Signature)
self.assertIsInstance(maybe_signature(dict(self.add.s())), Signature)
def test_when_sig(self):
s = self.add.s()
self.assertIs(maybe_subtask(s), s)
self.assertIs(maybe_signature(s), s)

Просмотреть файл

@ -141,8 +141,8 @@ class test_unlock_chord_task(ChordCase):
fail_current = self.app.backend.fail_from_current_stack = Mock()
try:
with patch_unlock_retry(self.app) as (unlock, retry):
subtask, canvas.maybe_subtask = (
canvas.maybe_subtask, passthru,
subtask, canvas.maybe_signature = (
canvas.maybe_signature, passthru,
)
if setup:
setup(callback)
@ -154,7 +154,7 @@ class test_unlock_chord_task(ChordCase):
GroupResult=ResultCls, **kwargs
)
finally:
canvas.maybe_subtask = subtask
canvas.maybe_signature = subtask
yield callback_s, retry, fail_current
finally:
result.GroupResult = pts

Просмотреть файл

@ -274,11 +274,11 @@ class test_Gossip(AppCase):
g = Gossip(c)
g.start(c)
with patch('celery.worker.consumer.subtask') as subtask:
sig = subtask.return_value = Mock()
with patch('celery.worker.consumer.signature') as signature:
sig = signature.return_value = Mock()
task = Mock()
g.call_task(task)
subtask.assert_called_with(task)
signature.assert_called_with(task)
sig.apply_async.assert_called_with()
sig.apply_async.side_effect = MemoryError()

Просмотреть файл

@ -33,7 +33,7 @@ from kombu.utils.limits import TokenBucket
from celery import bootsteps
from celery.app.trace import build_tracer
from celery.canvas import subtask
from celery.canvas import signature
from celery.exceptions import InvalidTaskError
from celery.five import items, values
from celery.utils.functional import noop
@ -625,8 +625,7 @@ class Gossip(bootsteps.ConsumerStep):
def call_task(self, task):
try:
X = subtask(task)
X.apply_async()
signature(task).apply_async()
except Exception as exc:
error('Could not call task: %r', exc, exc_info=1)

Просмотреть файл

@ -10,28 +10,32 @@
.. _canvas-subtasks:
Subtasks
========
.. _canvas-signatures:
Signatures
==========
.. versionadded:: 2.0
You just learned how to call a task using the tasks ``delay`` method
in the :ref:`calling <guide-calling>` guide, and this is often all you need,
but sometimes you may want to pass the signature of a task invocation to
another process or as an argument to another function, for this Celery uses
something called *subtasks*.
another process or as an argument to another function.
A :func:`~celery.subtask` wraps the arguments, keyword arguments, and execution options
A :func:`~celery.signature` wraps the arguments, keyword arguments, and execution options
of a single task invocation in a way such that it can be passed to functions
or even serialized and sent across the wire.
- You can create a subtask for the ``add`` task using its name like this::
Signatures are often nicknamed "subtasks" because they descripe a task to be called
within a task.
>>> from celery import subtask
>>> subtask('tasks.add', args=(2, 2), countdown=10)
- You can create a signature for the ``add`` task using its name like this::
>>> from celery import signature
>>> signature('tasks.add', args=(2, 2), countdown=10)
tasks.add(2, 2)
This subtask has a signature of arity 2 (two arguments): ``(2, 2)``,
This task has a signature of arity 2 (two arguments): ``(2, 2)``,
and sets the countdown execution option to 10.
- or you can create one using the task's ``subtask`` method::
@ -49,7 +53,7 @@ or even serialized and sent across the wire.
>>> add.s(2, 2, debug=True)
tasks.add(2, 2, debug=True)
- From any subtask instance you can inspect the different fields::
- From any signature instance you can inspect the different fields::
>>> s = add.subtask((2, 2), {'debug': True}, countdown=10)
>>> s.args
@ -62,7 +66,7 @@ or even serialized and sent across the wire.
- It supports the "Calling API" which means it supports ``delay`` and
``apply_async`` or being called directly.
Calling the subtask will execute the task inline in the current process::
Calling the signature will execute the task inline in the current process::
>>> add(2, 2)
4
@ -93,7 +97,7 @@ or even serialized and sent across the wire.
Partials
--------
You can execute the subtask in a worker::
With a signature, you can execute the task in a worker::
>>> add.s(2, 2).delay()
>>> add.s(2, 2).apply_async(countdown=1)
@ -125,7 +129,7 @@ creates partials:
>>> s = add.subtask((2, 2), countdown=10)
>>> s.apply_async(countdown=1) # countdown is now 1
You can also clone subtasks to create derivates:
You can also clone signatures to create derivates:
>>> s = add.s(2)
proj.tasks.add(2)
@ -141,28 +145,28 @@ Immutability
Partials are meant to be used with callbacks, any tasks linked or chord
callbacks will be applied with the result of the parent task.
Sometimes you want to specify a callback that does not take
additional arguments, and in that case you can set the subtask
additional arguments, and in that case you can set the signature
to be immutable::
>>> add.apply_async((2, 2), link=reset_buffers.subtask(immutable=True))
The ``.si()`` shortcut can also be used to create immutable subtasks::
The ``.si()`` shortcut can also be used to create immutable signatures::
>>> add.apply_async((2, 2), link=reset_buffers.si())
Only the execution options can be set when a subtask is immutable,
so it's not possible to call the subtask with partial args/kwargs.
Only the execution options can be set when a signature is immutable,
so it's not possible to call the signature with partial args/kwargs.
.. note::
In this tutorial I sometimes use the prefix operator `~` to subtasks.
In this tutorial I sometimes use the prefix operator `~` to signatures.
You probably shouldn't use it in your production code, but it's a handy shortcut
when experimenting in the Python shell::
>>> ~subtask
>>> ~sig
>>> # is the same as
>>> subtask.delay().get()
>>> sig.delay().get()
.. _canvas-callbacks:
@ -175,19 +179,19 @@ Callbacks
Callbacks can be added to any task using the ``link`` argument
to ``apply_async``::
add.apply_async((2, 2), link=other_task.subtask())
add.apply_async((2, 2), link=other_task.s())
The callback will only be applied if the task exited successfully,
and it will be applied with the return value of the parent task as argument.
As I mentioned earlier, any arguments you add to `subtask`,
will be prepended to the arguments specified by the subtask itself!
As I mentioned earlier, any arguments you add to a signature,
will be prepended to the arguments specified by the signature itself!
If you have the subtask::
If you have the signature::
>>> add.subtask(args=(10, ))
>>> sig = add.s(10)
`subtask.delay(result)` becomes::
then `sig.delay(result)` becomes::
>>> add.apply_async(args=(result, 10))
@ -196,7 +200,7 @@ If you have the subtask::
Now let's call our ``add`` task with a callback using partial
arguments::
>>> add.apply_async((2, 2), link=add.subtask((8, )))
>>> add.apply_async((2, 2), link=add.s(8))
As expected this will first launch one task calculating :math:`2 + 2`, then
another task calculating :math:`4 + 8`.
@ -210,12 +214,12 @@ The Primitives
- ``group``
The group primitive is a subtask that takes a list of tasks that should
The group primitive is a signature that takes a list of tasks that should
be applied in parallel.
- ``chain``
The chain primitive lets us link together subtasks so that one is called
The chain primitive lets us link together signatures so that one is called
after the other, essentially forming a *chain* of callbacks.
- ``chord``
@ -253,7 +257,7 @@ The Primitives
tasks (each processing 10 items in sequence).
The primitives are also subtasks themselves, so that they can be combined
The primitives are also signature objects themselves, so that they can be combined
in any number of ways to compose complex workflows.
Here's some examples:
@ -277,13 +281,13 @@ Here's some examples:
>>> (add.s(2, 2) | add.s(4) | add.s(8))().get()
16
- Immutable subtasks
- Immutable signatures
Signatures can be partial so arguments can be
added to the existing arguments, but you may not always want that,
for example if you don't want the result of the previous task in a chain.
In that case you can mark the subtask as immutable, so that the arguments
In that case you can mark the signature as immutable, so that the arguments
cannot be changed::
>>> add.subtask((2, 2), immutable=True)
@ -355,7 +359,7 @@ Here's some examples:
>>> chord((import_contact.s(c) for c in contacts),
... notify_complete.si(import_id)).apply_async()
Note the use of ``.si`` above which creates an immutable subtask.
Note the use of ``.si`` above which creates an immutable signature.
- Blow your mind by combining
@ -399,7 +403,7 @@ Here's some examples:
If you don't want to forward arguments to the group then
you can make the subtasks in the group immutable::
you can make the signatures in the group immutable::
>>> res = (add.s(4, 4) | group(add.si(i, i) for i in xrange(10)))()
>>> res.get()
@ -464,7 +468,7 @@ too::
....
You can link together as many tasks as you like,
and subtasks can be linked too::
and signatures can be linked too::
>>> s = add.s(2, 2)
>>> s.link(mul.s(4))
@ -494,7 +498,7 @@ the error callbacks take the id of the parent task as argument instead:
task_id, result.result, result.traceback), file=fh)
To make it even easier to link tasks together there is
a special subtask called :class:`~celery.chain` that lets
a special signature called :class:`~celery.chain` that lets
you chain tasks together:
.. code-block:: python
@ -571,7 +575,7 @@ Groups
A group can be used to execute several tasks in parallel.
The :class:`~celery.group` function takes a list of subtasks::
The :class:`~celery.group` function takes a list of signatures::
>>> from celery import group
>>> from proj.tasks import add
@ -601,8 +605,8 @@ Group also supports iterators::
>>> group(add.s(i, i) for i in xrange(100))()
A group is a subtask instance, so it can be used in combination
with other subtasks.
A group is a signature object, so it can be used in combination
with other signatures.
Group Results
~~~~~~~~~~~~~
@ -615,11 +619,11 @@ that it works on the group as a whole::
>>> from tasks import add
>>> job = group([
... add.subtask((2, 2)),
... add.subtask((4, 4)),
... add.subtask((8, 8)),
... add.subtask((16, 16)),
... add.subtask((32, 32)),
... add.s(2, 2),
... add.s(4, 4),
... add.s(8, 8),
... add.s(16, 16),
... add.s(32, 32),
... ])
>>> result = job.apply_async()
@ -727,8 +731,8 @@ Let's break the chord expression down:
.. code-block:: python
>>> callback = tsum.subtask()
>>> header = [add.subtask((i, i)) for i in xrange(100)]
>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(100)]
>>> result = chord(header)(callback)
>>> result.get()
9900
@ -815,17 +819,19 @@ Example decorated task:
do_something()
By default the synchronization step is implemented by having a recurring task
poll the completion of the group every second, calling the subtask when
poll the completion of the group every second, calling the signature when
ready.
Example implementation:
.. code-block:: python
from celery import maybe_signature
@app.task(bind=True)
def unlock_chord(self, group, callback, interval=1, max_retries=None):
if group.ready():
return subtask(callback).delay(group.join())
return maybe_signature(callback).delay(group.join())
raise self.retry(countdown=interval, max_retries=max_retries)
@ -895,8 +901,8 @@ is the same as having a task doing:
def temp():
return [add(i, i) for i in range(10)]
Both ``map`` and ``starmap`` are subtasks, so they can be used as
other subtasks and combined in groups etc., for example
Both ``map`` and ``starmap`` are signature objects, so they can be used as
other signatures and combined in groups etc., for example
to call the starmap after 10 seconds::
>>> add.starmap(zip(range(10), range(10))).apply_async(countdown=10)
@ -915,7 +921,7 @@ of parallelism, but this is rarely true for a busy cluster
and in practice since you are avoiding the overhead of messaging
it may considerably increase performance.
To create a chunks subtask you can use :meth:`@Task.chunks`:
To create a chunks signature you can use :meth:`@Task.chunks`:
.. code-block:: python

Просмотреть файл

@ -19,7 +19,7 @@
# ... A_callback.subtask()), countdown=1)
from celery import chord, group, task, subtask, uuid
from celery import chord, group, task, signature, uuid
from celery.result import AsyncResult, ResultSet
from collections import deque
@ -79,7 +79,7 @@ def unlock_graph(result, callback,
if result.ready():
second_level_res = result.get()
if second_level_res.ready():
subtask(callback).delay(list(joinall(
signature(callback).delay(list(joinall(
second_level_res, propagate=propagate)))
else:
unlock_graph.retry(countdown=interval, max_retries=max_retries)