diff --git a/celery/signals.py b/celery/signals.py index bfc8240e3..c864a1b64 100644 --- a/celery/signals.py +++ b/celery/signals.py @@ -54,7 +54,7 @@ task_rejected = Signal(providing_args=[ 'message', 'exc', ]) task_unknown = Signal(providing_args=[ - 'message', 'exc', + 'message', 'exc', 'name', 'id', ]) celeryd_init = Signal(providing_args=['instance', 'conf', 'options']) celeryd_after_setup = Signal(providing_args=['instance', 'conf']) diff --git a/celery/worker/consumer.py b/celery/worker/consumer.py index d24710879..984826518 100644 --- a/celery/worker/consumer.py +++ b/celery/worker/consumer.py @@ -439,10 +439,17 @@ class Consumer(object): def on_unknown_task(self, body, message, exc): error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True) + id_, name = message.headers['id'], message.headers['task'] message.reject_log_error(logger, self.connection_errors) - self.app.backend.mark_as_failure( - message.headers['id'], NotRegistered(message.headers['task'])) - signals.task_unknown.send(sender=self, message=message, exc=exc) + self.app.backend.mark_as_failure(id_, NotRegistered(name)) + if self.event_dispatcher: + self.event_dispatcher.send( + 'task-failed', uuid=id_, + exception='NotRegistered({0!r})'.format(name), + ) + signals.task_unknown.send( + sender=self, message=message, exc=exc, name=name, id=id_, + ) def on_invalid_task(self, body, message, exc): error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True) diff --git a/docs/userguide/signals.rst b/docs/userguide/signals.rst index 9e48e9648..db5c1eb65 100644 --- a/docs/userguide/signals.rst +++ b/docs/userguide/signals.rst @@ -311,6 +311,14 @@ Sender is the worker :class:`~celery.worker.consumer.Consumer`. Provides arguments: +* name + + Name of task not found in registry. + +* id + + The task id found in the message. + * message Raw message object.