Adds new `task_remote_tracebacks` setting using tblib to format tracebacks.
This commit is contained in:
Родитель
8c8b2bd99b
Коммит
592b01b90d
|
@ -190,6 +190,7 @@ NAMESPACES = Namespace(
|
|||
),
|
||||
persistent=Option(None, type='bool'),
|
||||
serializer=Option('json'),
|
||||
fspath=Option(None),
|
||||
),
|
||||
riak=Namespace(
|
||||
__old__=old_ns('celery_riak'),
|
||||
|
@ -245,6 +246,7 @@ NAMESPACES = Namespace(
|
|||
queue_ha_policy=Option(None, type='string'),
|
||||
queue_max_priority=Option(None, type='int'),
|
||||
reject_on_worker_lost=Option(type='bool'),
|
||||
remote_tracebacks=Option(False, type='bool'),
|
||||
routes=Option(type='any'),
|
||||
send_error_emails=Option(
|
||||
False, type='bool', old={'celery_send_task_error_emails'},
|
||||
|
|
|
@ -28,8 +28,15 @@ from .five import (
|
|||
)
|
||||
from .utils import deprecated
|
||||
|
||||
__all__ = ['ResultBase', 'AsyncResult', 'ResultSet', 'GroupResult',
|
||||
'EagerResult', 'result_from_tuple']
|
||||
try:
|
||||
import tblib
|
||||
except ImportError:
|
||||
tblib = None
|
||||
|
||||
__all__ = [
|
||||
'ResultBase', 'AsyncResult', 'ResultSet',
|
||||
'GroupResult', 'EagerResult', 'result_from_tuple',
|
||||
]
|
||||
|
||||
E_WOULDBLOCK = """\
|
||||
Never call result.get() within a task!
|
||||
|
@ -280,13 +287,17 @@ class AsyncResult(ResultBase):
|
|||
|
||||
def maybe_throw(self, propagate=True, callback=None):
|
||||
cache = self._get_task_meta() if self._cache is None else self._cache
|
||||
state, value = cache['status'], cache['result']
|
||||
state, value, tb = cache['status'], cache['result'], cache.get('traceback')
|
||||
if state in states.PROPAGATE_STATES and propagate:
|
||||
self.throw(value)
|
||||
self.throw(value, self._to_remote_traceback(tb))
|
||||
if callback is not None:
|
||||
callback(self.id, value)
|
||||
return value
|
||||
|
||||
def _to_remote_traceback(self, tb):
|
||||
if tb and tblib is not None and self.app.conf.task_remote_tracebacks:
|
||||
return tblib.Traceback.from_string(tbstring).as_traceback()
|
||||
|
||||
def build_graph(self, intermediate=False, formatter=None):
|
||||
graph = DependencyGraph(
|
||||
formatter=formatter or GraphFormatter(root=self.id, shape='oval'),
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import traceback
|
||||
from contextlib import contextmanager
|
||||
|
||||
from celery import states
|
||||
|
@ -22,13 +23,22 @@ from celery.tests.case import (
|
|||
AppCase, Mock, call, depends_on_current_app, patch,
|
||||
)
|
||||
|
||||
PYTRACEBACK = """\
|
||||
Traceback (most recent call last):
|
||||
File "foo.py", line 2, in foofunc
|
||||
don't matter
|
||||
File "bar.py", line 3, in barfunc
|
||||
don't matter
|
||||
Doesn't matter: really!\
|
||||
"""
|
||||
|
||||
def mock_task(name, state, result):
|
||||
return dict(id=uuid(), name=name, state=state, result=result)
|
||||
|
||||
def mock_task(name, state, result, traceback=None):
|
||||
return dict(id=uuid(), name=name, state=state, result=result, traceback=traceback)
|
||||
|
||||
|
||||
def save_result(app, task):
|
||||
traceback = 'Some traceback'
|
||||
traceback = task.get('traceback') or 'Some traceback'
|
||||
if task['state'] == states.SUCCESS:
|
||||
app.backend.mark_as_done(task['id'], task['result'])
|
||||
elif task['state'] == states.RETRY:
|
||||
|
@ -56,8 +66,11 @@ class test_AsyncResult(AppCase):
|
|||
self.task2 = mock_task('task2', states.SUCCESS, 'quick')
|
||||
self.task3 = mock_task('task3', states.FAILURE, KeyError('brown'))
|
||||
self.task4 = mock_task('task3', states.RETRY, KeyError('red'))
|
||||
self.task5 = mock_task(
|
||||
'task3', states.FAILURE, KeyError('blue'), PYTRACEBACK,
|
||||
)
|
||||
|
||||
for task in (self.task1, self.task2, self.task3, self.task4):
|
||||
for task in (self.task1, self.task2, self.task3, self.task4, self.task5):
|
||||
save_result(self.app, task)
|
||||
|
||||
@self.app.task(shared=False)
|
||||
|
@ -202,6 +215,35 @@ class test_AsyncResult(AppCase):
|
|||
pending_res = self.app.AsyncResult(uuid())
|
||||
self.assertFalse(pending_res.successful())
|
||||
|
||||
def test_raising(self):
|
||||
notb = self.app.AsyncResult(self.task3['id'])
|
||||
withtb = self.app.AsyncResult(self.task5['id'])
|
||||
|
||||
self.assertRaises(KeyError, notb.get)
|
||||
try:
|
||||
withtb.get()
|
||||
except KeyError:
|
||||
tb = traceback.format_exc()
|
||||
self.assertNotIn(' File "foo.py", line 2, in foofunc', tb)
|
||||
self.assertNotIn(' File "bar.py", line 3, in barfunc', tb)
|
||||
self.assertIn("KeyError: 'blue'", tb)
|
||||
else:
|
||||
raise AssertionError('Did not raise KeyError.')
|
||||
try:
|
||||
old = self.app.conf.remote_tracebacks
|
||||
self.app.conf.remote_tracebacks = True
|
||||
try:
|
||||
withtb.get()
|
||||
except KeyError:
|
||||
tb = traceback.format_exc()
|
||||
self.assertIn(' File "foo.py", line 2, in foofunc', tb)
|
||||
self.assertIn(' File "bar.py", line 3, in barfunc', tb)
|
||||
self.assertIn("KeyError: 'blue'", tb)
|
||||
else:
|
||||
raise AssertionError('Did not raise KeyError.')
|
||||
finally:
|
||||
self.app.conf.remote_tracebacks = old
|
||||
|
||||
def test_str(self):
|
||||
ok_res = self.app.AsyncResult(self.task1['id'])
|
||||
ok2_res = self.app.AsyncResult(self.task2['id'])
|
||||
|
|
|
@ -367,6 +367,20 @@ propagate exceptions.
|
|||
|
||||
It's the same as always running ``apply()`` with ``throw=True``.
|
||||
|
||||
.. setting:: task_remote_tracebacks
|
||||
|
||||
task_remote_tracebacks
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
If enabled task results will include the workers stack when reraising task errors.
|
||||
|
||||
This requires the :pypi:`tblib` library, which can be installed using
|
||||
:command:`pip`:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
$ pip install 'tblib>=1.3.0'
|
||||
|
||||
.. setting:: task_ignore_result
|
||||
|
||||
task_ignore_result
|
||||
|
@ -559,6 +573,7 @@ Result serialization format. Default is ``pickle``. See
|
|||
:ref:`calling-serializers` for information about supported
|
||||
serialization formats.
|
||||
|
||||
|
||||
.. setting:: result_compression
|
||||
|
||||
result_compression
|
||||
|
|
|
@ -78,6 +78,9 @@ Transports and Backends
|
|||
:celery[sqs]:
|
||||
for using Amazon SQS as a message transport (*experimental*).
|
||||
|
||||
:celery[tblib]
|
||||
for using the :setting:`task_remote_tracebacks` feature.
|
||||
|
||||
:celery[memcache]:
|
||||
for using memcached as a result backend (using pylibmc)
|
||||
|
||||
|
|
|
@ -749,7 +749,6 @@ A new builtin task (`celery.accumulate` was added for this purpose)
|
|||
|
||||
Closes #817
|
||||
|
||||
|
||||
Optimized Beat implementation
|
||||
=============================
|
||||
|
||||
|
@ -774,6 +773,16 @@ Contributed by Dmitry Malinovsky.
|
|||
|
||||
# 75246714dd11e6c463b9dc67f4311690643bff24
|
||||
|
||||
Remote Task Tracebacks
|
||||
======================
|
||||
|
||||
The new :setting:`task_remote_tracebacks` will make task tracebacks more
|
||||
useful by injecting the stack of the remote worker.
|
||||
|
||||
This feature requires the additional :pypi:`tblib` library.
|
||||
|
||||
Contributed by Ionel Cristian Mărieș.
|
||||
|
||||
Async Result API
|
||||
================
|
||||
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
tblib>=1.3.0
|
2
setup.py
2
setup.py
|
@ -194,7 +194,7 @@ features = set([
|
|||
'auth', 'cassandra', 'elasticsearch', 'memcache', 'pymemcache',
|
||||
'couchbase', 'threads', 'eventlet', 'gevent', 'msgpack', 'yaml',
|
||||
'redis', 'mongodb', 'sqs', 'couchdb', 'riak', 'beanstalk', 'zookeeper',
|
||||
'zeromq', 'sqlalchemy', 'librabbitmq', 'pyro', 'slmq',
|
||||
'zeromq', 'sqlalchemy', 'librabbitmq', 'pyro', 'slmq', 'tblib'
|
||||
])
|
||||
extras_require = dict((x, extras(x + '.txt')) for x in features)
|
||||
extra['extras_require'] = extras_require
|
||||
|
|
Загрузка…
Ссылка в новой задаче