Renames Task.subtask -> .signature (with alias for compat)
This commit is contained in:
Родитель
61288aa2a8
Коммит
661bbfe984
|
@ -555,8 +555,8 @@ class Task(object):
|
|||
**dict(self._get_exec_options(), **options)
|
||||
)
|
||||
|
||||
def subtask_from_request(self, request=None, args=None, kwargs=None,
|
||||
queue=None, **extra_options):
|
||||
def signature_from_request(self, request=None, args=None, kwargs=None,
|
||||
queue=None, **extra_options):
|
||||
request = self.request if request is None else request
|
||||
args = request.args if args is None else args
|
||||
kwargs = request.kwargs if kwargs is None else kwargs
|
||||
|
@ -573,7 +573,10 @@ class Task(object):
|
|||
options.update(
|
||||
{'queue': queue} if queue else (request.delivery_info or {})
|
||||
)
|
||||
return self.subtask(args, kwargs, options, type=self, **extra_options)
|
||||
return self.signature(
|
||||
args, kwargs, options, type=self, **extra_options
|
||||
)
|
||||
subtask_from_request = signature_from_request
|
||||
|
||||
def retry(self, args=None, kwargs=None, exc=None, throw=True,
|
||||
eta=None, countdown=None, max_retries=None, **options):
|
||||
|
@ -647,7 +650,7 @@ class Task(object):
|
|||
countdown = self.default_retry_delay
|
||||
|
||||
is_eager = request.is_eager
|
||||
S = self.subtask_from_request(
|
||||
S = self.signature_from_request(
|
||||
request, args, kwargs,
|
||||
countdown=countdown, eta=eta, retries=retries,
|
||||
**options
|
||||
|
@ -748,20 +751,21 @@ class Task(object):
|
|||
return self._get_app().AsyncResult(task_id, backend=self.backend,
|
||||
task_name=self.name, **kwargs)
|
||||
|
||||
def subtask(self, args=None, *starargs, **starkwargs):
|
||||
def signature(self, args=None, *starargs, **starkwargs):
|
||||
"""Return :class:`~celery.signature` object for
|
||||
this task, wrapping arguments and execution options
|
||||
for a single task invocation."""
|
||||
starkwargs.setdefault('app', self.app)
|
||||
return signature(self, args, *starargs, **starkwargs)
|
||||
subtask = signature
|
||||
|
||||
def s(self, *args, **kwargs):
|
||||
"""``.s(*a, **k) -> .subtask(a, k)``"""
|
||||
return self.subtask(args, kwargs)
|
||||
"""``.s(*a, **k) -> .signature(a, k)``"""
|
||||
return self.signature(args, kwargs)
|
||||
|
||||
def si(self, *args, **kwargs):
|
||||
"""``.si(*a, **k) -> .subtask(a, k, immutable=True)``"""
|
||||
return self.subtask(args, kwargs, immutable=True)
|
||||
"""``.si(*a, **k) -> .signature(a, k, immutable=True)``"""
|
||||
return self.signature(args, kwargs, immutable=True)
|
||||
|
||||
def chunks(self, it, n):
|
||||
"""Creates a :class:`~celery.canvas.chunks` task for this task."""
|
||||
|
|
|
@ -24,6 +24,7 @@ __all__ = ['Task', 'PeriodicTask', 'task']
|
|||
#: list of methods that must be classmethods in the old API.
|
||||
_COMPAT_CLASSMETHODS = (
|
||||
'delay', 'apply_async', 'retry', 'apply', 'subtask_from_request',
|
||||
'signature_from_request', 'signature',
|
||||
'AsyncResult', 'subtask', '_get_request', '_get_exec_options',
|
||||
)
|
||||
|
||||
|
|
|
@ -142,7 +142,7 @@ class test_unlock_chord_task(ChordCase):
|
|||
fail_current = self.app.backend.fail_from_current_stack = Mock()
|
||||
try:
|
||||
with patch_unlock_retry(self.app) as (unlock, retry):
|
||||
subtask, canvas.maybe_signature = (
|
||||
signature, canvas.maybe_signature = (
|
||||
canvas.maybe_signature, passthru,
|
||||
)
|
||||
if setup:
|
||||
|
@ -160,7 +160,7 @@ class test_unlock_chord_task(ChordCase):
|
|||
except Retry:
|
||||
pass
|
||||
finally:
|
||||
canvas.maybe_signature = subtask
|
||||
canvas.maybe_signature = signature
|
||||
yield callback_s, retry, fail_current
|
||||
finally:
|
||||
result.GroupResult = pts
|
||||
|
@ -211,7 +211,7 @@ class test_chord(ChordCase):
|
|||
body = self.add.s(2)
|
||||
result = x(body)
|
||||
self.assertTrue(result.id)
|
||||
# does not modify original subtask
|
||||
# does not modify original signature
|
||||
with self.assertRaises(KeyError):
|
||||
body.options['task_id']
|
||||
self.assertTrue(chord._type.called)
|
||||
|
@ -228,6 +228,6 @@ class test_Chord_task(ChordCase):
|
|||
Chord = self.app.tasks['celery.chord']
|
||||
|
||||
body = dict()
|
||||
Chord(group(self.add.subtask((i, i)) for i in range(5)), body)
|
||||
Chord([self.add.subtask((j, j)) for j in range(5)], body)
|
||||
Chord(group(self.add.signature((i, i)) for i in range(5)), body)
|
||||
Chord([self.add.signature((j, j)) for j in range(5)], body)
|
||||
self.assertEqual(self.app.backend.apply_chord.call_count, 2)
|
||||
|
|
|
@ -275,7 +275,7 @@ so that no message is sent::
|
|||
|
||||
These three methods - :meth:`delay`, :meth:`apply_async`, and applying
|
||||
(``__call__``), represents the Celery calling API, which are also used for
|
||||
subtasks.
|
||||
signatures.
|
||||
|
||||
A more detailed overview of the Calling API can be found in the
|
||||
:ref:`Calling User Guide <guide-calling>`.
|
||||
|
@ -380,16 +380,16 @@ Calling tasks is described in detail in the
|
|||
You just learned how to call a task using the tasks ``delay`` method,
|
||||
and this is often all you need, but sometimes you may want to pass the
|
||||
signature of a task invocation to another process or as an argument to another
|
||||
function, for this Celery uses something called *subtasks*.
|
||||
function, for this Celery uses something called *signatures*.
|
||||
|
||||
A subtask wraps the arguments and execution options of a single task
|
||||
A signature wraps the arguments and execution options of a single task
|
||||
invocation in a way such that it can be passed to functions or even serialized
|
||||
and sent across the wire.
|
||||
|
||||
You can create a subtask for the ``add`` task using the arguments ``(2, 2)``,
|
||||
You can create a signature for the ``add`` task using the arguments ``(2, 2)``,
|
||||
and a countdown of 10 seconds like this::
|
||||
|
||||
>>> add.subtask((2, 2), countdown=10)
|
||||
>>> add.signature((2, 2), countdown=10)
|
||||
tasks.add(2, 2)
|
||||
|
||||
There is also a shortcut using star arguments::
|
||||
|
@ -400,12 +400,12 @@ There is also a shortcut using star arguments::
|
|||
And there's that calling API again…
|
||||
-----------------------------------
|
||||
|
||||
Subtask instances also supports the calling API, which means that they
|
||||
Signature instances also supports the calling API, which means that they
|
||||
have the ``delay`` and ``apply_async`` methods.
|
||||
|
||||
But there is a difference in that the subtask may already have
|
||||
But there is a difference in that the signature may already have
|
||||
an argument signature specified. The ``add`` task takes two arguments,
|
||||
so a subtask specifying two arguments would make a complete signature::
|
||||
so a signature specifying two arguments would make a complete signature::
|
||||
|
||||
>>> s1 = add.s(2, 2)
|
||||
>>> res = s1.delay()
|
||||
|
@ -418,8 +418,8 @@ But, you can also make incomplete signatures to create what we call
|
|||
# incomplete partial: add(?, 2)
|
||||
>>> s2 = add.s(2)
|
||||
|
||||
``s2`` is now a partial subtask that needs another argument to be complete,
|
||||
and this can be resolved when calling the subtask::
|
||||
``s2`` is now a partial signature that needs another argument to be complete,
|
||||
and this can be resolved when calling the signature::
|
||||
|
||||
# resolves the partial: add(8, 2)
|
||||
>>> res = s2.delay(8)
|
||||
|
@ -435,14 +435,14 @@ existing keyword arguments, but with new arguments taking precedence::
|
|||
>>> s3 = add.s(2, 2, debug=True)
|
||||
>>> s3.delay(debug=False) # debug is now False.
|
||||
|
||||
As stated subtasks supports the calling API, which means that:
|
||||
As stated signatures supports the calling API, which means that:
|
||||
|
||||
- ``subtask.apply_async(args=(), kwargs={}, **options)``
|
||||
- ``sig.apply_async(args=(), kwargs={}, **options)``
|
||||
|
||||
Calls the subtask with optional partial arguments and partial
|
||||
Calls the signature with optional partial arguments and partial
|
||||
keyword arguments. Also supports partial execution options.
|
||||
|
||||
- ``subtask.delay(*args, **kwargs)``
|
||||
- ``sig.delay(*args, **kwargs)``
|
||||
|
||||
Star argument version of ``apply_async``. Any arguments will be prepended
|
||||
to the arguments in the signature, and keyword arguments is merged with any
|
||||
|
@ -466,7 +466,7 @@ The Primitives
|
|||
- :ref:`starmap <canvas-map>`
|
||||
- :ref:`chunks <canvas-chunks>`
|
||||
|
||||
The primitives are subtasks themselves, so that they can be combined
|
||||
These primitives are signature objects themselves, so they can be combined
|
||||
in any number of ways to compose complex workflows.
|
||||
|
||||
.. note::
|
||||
|
@ -556,7 +556,7 @@ to a chord:
|
|||
90
|
||||
|
||||
|
||||
Since these primitives are all of the subtask type they
|
||||
Since these primitives are all of the signature type they
|
||||
can be combined almost however you want, e.g::
|
||||
|
||||
>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)
|
||||
|
|
|
@ -64,7 +64,7 @@ Naming
|
|||
Sometimes it makes sense to have a class mask as a function,
|
||||
and there is precedence for this in the stdlib (e.g.
|
||||
:class:`~contextlib.contextmanager`). Celery examples include
|
||||
:class:`~celery.subtask`, :class:`~celery.chord`,
|
||||
:class:`~celery.signature`, :class:`~celery.chord`,
|
||||
``inspect``, :class:`~kombu.utils.functional.promise` and more..
|
||||
|
||||
- Factory functions and methods must be `CamelCase` (excluding verbs):
|
||||
|
|
|
@ -71,7 +71,7 @@ to process it.
|
|||
The taskset this task is part of (if any).
|
||||
|
||||
* chord
|
||||
:`subtask`:
|
||||
:`Signature`:
|
||||
|
||||
.. versionadded:: 2.3
|
||||
|
||||
|
@ -88,18 +88,18 @@ to process it.
|
|||
should be used.
|
||||
|
||||
* callbacks
|
||||
:`<list>subtask`:
|
||||
:`<list>Signature`:
|
||||
|
||||
.. versionadded:: 3.0
|
||||
|
||||
A list of subtasks to apply if the task exited successfully.
|
||||
A list of signatures to call if the task exited successfully.
|
||||
|
||||
* errbacks
|
||||
:`<list>subtask`:
|
||||
:`<list>Signature`:
|
||||
|
||||
.. versionadded:: 3.0
|
||||
|
||||
A list of subtasks to apply if an error occurs while executing the task.
|
||||
A list of signatures to call if an error occurs while executing the task.
|
||||
|
||||
* timelimit
|
||||
:`<tuple>(float, float)`:
|
||||
|
|
|
@ -470,7 +470,7 @@ See :ref:`guide-canvas` for more about creating task workflows.
|
|||
|
||||
Signatures can also be created from tasks::
|
||||
|
||||
>>> add.subtask(args=(), kwargs={}, options={})
|
||||
>>> add.signature(args=(), kwargs={}, options={})
|
||||
|
||||
or the ``.s()`` shortcut::
|
||||
|
||||
|
|
|
@ -95,7 +95,7 @@ called `add`, returning the sum of two arguments:
|
|||
.. topic:: There's another way…
|
||||
|
||||
You will learn more about this later while reading about the :ref:`Canvas
|
||||
<guide-canvas>`, but :class:`~celery.subtask`'s are objects used to pass around
|
||||
<guide-canvas>`, but :class:`~celery.signature`'s are objects used to pass around
|
||||
the signature of a task invocation, (for example to send it over the
|
||||
network), and they also support the Calling API:
|
||||
|
||||
|
@ -118,8 +118,8 @@ as a partial argument:
|
|||
|
||||
.. sidebar:: What is ``s``?
|
||||
|
||||
The ``add.s`` call used here is called a subtask, I talk
|
||||
more about subtasks in the :ref:`canvas guide <guide-canvas>`,
|
||||
The ``add.s`` call used here is called a signature, I talk
|
||||
more about signatures in the :ref:`canvas guide <guide-canvas>`,
|
||||
where you can also learn about :class:`~celery.chain`, which
|
||||
is a simpler way to chain tasks together.
|
||||
|
||||
|
@ -447,7 +447,7 @@ Though this particular example is much better expressed as a group:
|
|||
>>> from celery import group
|
||||
|
||||
>>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
|
||||
>>> res = group(add.subtask(n) for i in numbers).apply_async()
|
||||
>>> res = group(add.s(n) for i in numbers).apply_async()
|
||||
|
||||
>>> res.get()
|
||||
[4, 8, 16, 32]
|
||||
|
|
|
@ -26,9 +26,6 @@ A :func:`~celery.signature` wraps the arguments, keyword arguments, and executio
|
|||
of a single task invocation in a way such that it can be passed to functions
|
||||
or even serialized and sent across the wire.
|
||||
|
||||
Signatures are often nicknamed "subtasks" because they describe a task to be called
|
||||
within a task.
|
||||
|
||||
- You can create a signature for the ``add`` task using its name like this::
|
||||
|
||||
>>> from celery import signature
|
||||
|
@ -38,9 +35,9 @@ within a task.
|
|||
This task has a signature of arity 2 (two arguments): ``(2, 2)``,
|
||||
and sets the countdown execution option to 10.
|
||||
|
||||
- or you can create one using the task's ``subtask`` method::
|
||||
- or you can create one using the task's ``signature`` method::
|
||||
|
||||
>>> add.subtask((2, 2), countdown=10)
|
||||
>>> add.signature((2, 2), countdown=10)
|
||||
tasks.add(2, 2)
|
||||
|
||||
- There is also a shortcut using star arguments::
|
||||
|
@ -55,7 +52,7 @@ within a task.
|
|||
|
||||
- From any signature instance you can inspect the different fields::
|
||||
|
||||
>>> s = add.subtask((2, 2), {'debug': True}, countdown=10)
|
||||
>>> s = add.signature((2, 2), {'debug': True}, countdown=10)
|
||||
>>> s.args
|
||||
(2, 2)
|
||||
>>> s.kwargs
|
||||
|
@ -82,10 +79,10 @@ within a task.
|
|||
``apply_async`` takes the same arguments as the :meth:`Task.apply_async <@Task.apply_async>` method::
|
||||
|
||||
>>> add.apply_async(args, kwargs, **options)
|
||||
>>> add.subtask(args, kwargs, **options).apply_async()
|
||||
>>> add.signature(args, kwargs, **options).apply_async()
|
||||
|
||||
>>> add.apply_async((2, 2), countdown=1)
|
||||
>>> add.subtask((2, 2), countdown=1).apply_async()
|
||||
>>> add.signature((2, 2), countdown=1).apply_async()
|
||||
|
||||
- You can't define options with :meth:`~@Task.s`, but a chaining
|
||||
``set`` call takes care of that::
|
||||
|
@ -125,7 +122,7 @@ creates partials:
|
|||
- Any options added will be merged with the options in the signature,
|
||||
with the new options taking precedence::
|
||||
|
||||
>>> s = add.subtask((2, 2), countdown=10)
|
||||
>>> s = add.signature((2, 2), countdown=10)
|
||||
>>> s.apply_async(countdown=1) # countdown is now 1
|
||||
|
||||
You can also clone signatures to create derivates:
|
||||
|
@ -147,7 +144,7 @@ Sometimes you want to specify a callback that does not take
|
|||
additional arguments, and in that case you can set the signature
|
||||
to be immutable::
|
||||
|
||||
>>> add.apply_async((2, 2), link=reset_buffers.subtask(immutable=True))
|
||||
>>> add.apply_async((2, 2), link=reset_buffers.signature(immutable=True))
|
||||
|
||||
The ``.si()`` shortcut can also be used to create immutable signatures::
|
||||
|
||||
|
@ -289,7 +286,7 @@ Here's some examples:
|
|||
In that case you can mark the signature as immutable, so that the arguments
|
||||
cannot be changed::
|
||||
|
||||
>>> add.subtask((2, 2), immutable=True)
|
||||
>>> add.signature((2, 2), immutable=True)
|
||||
|
||||
There's also an ``.si`` shortcut for this::
|
||||
|
||||
|
@ -419,7 +416,7 @@ The linked task will be applied with the result of its parent
|
|||
task as the first argument, which in the above case will result
|
||||
in ``mul(4, 16)`` since the result is 4.
|
||||
|
||||
The results will keep track of what subtasks a task applies,
|
||||
The results will keep track of any subtasks called by the original task,
|
||||
and this can be accessed from the result instance::
|
||||
|
||||
>>> res.children
|
||||
|
@ -456,7 +453,7 @@ You can also add *error callbacks* using the ``link_error`` argument::
|
|||
|
||||
>>> add.apply_async((2, 2), link_error=log_error.s())
|
||||
|
||||
>>> add.subtask((2, 2), link_error=log_error.s())
|
||||
>>> add.signature((2, 2), link_error=log_error.s())
|
||||
|
||||
Since exceptions can only be serialized when pickle is used
|
||||
the error callbacks take the id of the parent task as argument instead:
|
||||
|
|
|
@ -266,9 +266,9 @@ The request defines the following attributes:
|
|||
:called_directly: This flag is set to true if the task was not
|
||||
executed by the worker.
|
||||
|
||||
:callbacks: A list of subtasks to be called if this task returns successfully.
|
||||
:callbacks: A list of signatures to be called if this task returns successfully.
|
||||
|
||||
:errback: A list of subtasks to be called if this task fails.
|
||||
:errback: A list of signatures to be called if this task fails.
|
||||
|
||||
:utc: Set to true the caller has utc enabled (:setting:`CELERY_ENABLE_UTC`).
|
||||
|
||||
|
@ -1297,7 +1297,7 @@ Make your design asynchronous instead, for example by using *callbacks*.
|
|||
|
||||
|
||||
Here I instead created a chain of tasks by linking together
|
||||
different :func:`~celery.subtask`'s.
|
||||
different :func:`~celery.signature`'s.
|
||||
You can read about chains and other powerful constructs
|
||||
at :ref:`designing-workflows`.
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
# when the second task is ready.)
|
||||
#
|
||||
# >>> unlock_graph.apply_async((A.apply_async(),
|
||||
# ... A_callback.subtask()), countdown=1)
|
||||
# ... A_callback.s()), countdown=1)
|
||||
|
||||
|
||||
from celery import chord, group, task, signature, uuid
|
||||
|
|
Загрузка…
Ссылка в новой задаче