This commit is contained in:
Dave Dash 2012-02-09 11:16:43 -08:00
Родитель 0125410d33
Коммит f7e7e80ae5
12 изменённых файлов: 416 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,85 @@
Metadata-Version: 1.0
Name: celery-haystack
Version: 0.4
Summary: An app for integrating Celery with Haystack.
Home-page: http://celery-haystack.rtfd.org/
Author: Jannis Leidel
Author-email: jannis@leidel.info
License: BSD
Description: ===============
celery-haystack
===============
This Django app allows you to utilize Celery for automatically updating and
deleting objects in a Haystack_ search index.
Requirements
------------
* Django 1.2+
* Haystack_ `1.2.X`_ *or* `2.0.X`_
* Celery_ 2.X
You also need to install your choice of one of the supported search engines
for Haystack and one of the supported backends for Celery.
.. _Haystack: http://haystacksearch.org
.. _`1.2.X`: http://pypi.python.org/pypi/django-haystack/1.2.4
.. _`2.0.X`: https://github.com/toastdriven/django-haystack/tree/master
Installation
------------
Use your favorite Python package manager to install the app from PyPI, e.g.::
pip install celery-haystack
By default a few dependencies will automatically be installed:
- django-appconf_ -- An app to gracefully handle application settings.
- versiontools_ -- A library to help staying compatible to `PEP 386`_.
.. _django-appconf: http://pypi.python.org/pypi/django-appconf
.. _versiontools: http://pypi.python.org/pypi/versiontools
.. _`PEP 386`: http://www.python.org/dev/peps/pep-0386/
Setup
-----
1. Add ``'celery_haystack'`` to ``INSTALLED_APPS``.
2. Alter all of your ``SearchIndex`` subclasses to inherit from
``celery_haystack.indexes.CelerySearchIndex`` (as well as
``haystack.indexes.Indexable``).
3. Ensure your Celery instance is running.
Thanks
------
This app is a blatant rip-off of Daniel Lindsley's queued_search_
app but uses Ask Solem Hoel's Celery_ instead of the equally awesome
queues_ library by Matt Croyden.
.. _queued_search: https://github.com/toastdriven/queued_search/
.. _Celery: http://celeryproject.org/
.. _queues: http://code.google.com/p/queues/
Issues
------
Please use the `Github issue tracker`_ for any bug reports or feature
requests.
.. _`Github issue tracker`: https://github.com/ennio/celery-haystack/issues
Platform: UNKNOWN
Classifier: Development Status :: 4 - Beta
Classifier: Environment :: Web Environment
Classifier: Framework :: Django
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: BSD License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2.6
Classifier: Programming Language :: Python :: 2.7
Classifier: Topic :: Utilities

Просмотреть файл

@ -0,0 +1,30 @@
AUTHORS
MANIFEST.in
README.rst
setup.cfg
setup.py
tox.ini
celery_haystack/__init__.py
celery_haystack/conf.py
celery_haystack/indexes.py
celery_haystack/models.py
celery_haystack/tasks.py
celery_haystack/utils.py
celery_haystack.egg-info/PKG-INFO
celery_haystack.egg-info/SOURCES.txt
celery_haystack.egg-info/dependency_links.txt
celery_haystack.egg-info/requires.txt
celery_haystack.egg-info/top_level.txt
docs/Makefile
docs/changelog.rst
docs/conf.py
docs/index.rst
docs/make.bat
tests/__init__.py
tests/models.py
tests/search_indexes.py
tests/search_sites.py
tests/tests.py
tests/settings/__init__.py
tests/settings/v1.py
tests/settings/v2.py

Просмотреть файл

@ -0,0 +1 @@

Просмотреть файл

@ -0,0 +1,18 @@
../celery_haystack/__init__.py
../celery_haystack/conf.py
../celery_haystack/indexes.py
../celery_haystack/models.py
../celery_haystack/tasks.py
../celery_haystack/utils.py
../celery_haystack/__init__.pyc
../celery_haystack/conf.pyc
../celery_haystack/indexes.pyc
../celery_haystack/models.pyc
../celery_haystack/tasks.pyc
../celery_haystack/utils.pyc
./
dependency_links.txt
PKG-INFO
requires.txt
SOURCES.txt
top_level.txt

Просмотреть файл

@ -0,0 +1 @@
django-appconf >= 0.4.1

Просмотреть файл

@ -0,0 +1 @@
celery_haystack

Просмотреть файл

@ -0,0 +1,2 @@
# following PEP 386, versiontools will pick it up
__version__ = (0, 4, 0, "final", 0)

Просмотреть файл

@ -0,0 +1,29 @@
from django.conf import settings
from haystack import constants
from appconf import AppConf
class CeleryHaystack(AppConf):
DEFAULT_ALIAS = None
RETRY_DELAY = 5 * 60
MAX_RETRIES = 1
DEFAULT_TASK = 'celery_haystack.tasks.CeleryHaystackSignalHandler'
COMMAND_BATCH_SIZE = None
COMMAND_AGE = None
COMMAND_REMOVE = False
COMMAND_WORKERS = 0
COMMAND_APPS = []
COMMAND_VERBOSITY = 1
def configure_default_alias(self, value):
return value or getattr(constants, 'DEFAULT_ALIAS', None)
def configure(self):
data = {}
for name, value in self.configured_data.items():
if name in ('RETRY_DELAY', 'MAX_RETRIES',
'COMMAND_WORKERS', 'COMMAND_VERBOSITY'):
value = int(value)
data[name] = value
return data

Просмотреть файл

@ -0,0 +1,58 @@
from django.db.models import signals
from haystack import indexes
from haystack.utils import get_identifier
from celery_haystack.utils import get_update_task
class CelerySearchIndex(indexes.SearchIndex):
"""
A ``SearchIndex`` subclass that enqueues updates/deletes for later
processing using Celery.
"""
def __init__(self, *args, **kwargs):
super(CelerySearchIndex, self).__init__(*args, **kwargs)
self.task_cls = get_update_task()
self.has_get_model = hasattr(self, 'get_model')
def handle_model(self, model):
if model is None and self.has_get_model:
return self.get_model()
return model
# We override the built-in _setup_* methods to connect the enqueuing
# operation.
def _setup_save(self, model=None):
model = self.handle_model(model)
signals.post_save.connect(self.enqueue_save, sender=model)
def _setup_delete(self, model=None):
model = self.handle_model(model)
signals.post_delete.connect(self.enqueue_delete, sender=model)
def _teardown_save(self, model=None):
model = self.handle_model(model)
signals.post_save.disconnect(self.enqueue_save, sender=model)
def _teardown_delete(self, model=None):
model = self.handle_model(model)
signals.post_delete.disconnect(self.enqueue_delete, sender=model)
def enqueue_save(self, instance, **kwargs):
return self.enqueue('update', instance)
def enqueue_delete(self, instance, **kwargs):
return self.enqueue('delete', instance)
def enqueue(self, action, instance):
"""
Shoves a message about how to update the index into the queue.
This is a standardized string, resembling something like::
``notes.note.23``
# ...or...
``weblog.entry.8``
"""
return self.task_cls.delay(action, get_identifier(instance))

Просмотреть файл

Просмотреть файл

@ -0,0 +1,171 @@
from django.core.exceptions import ImproperlyConfigured
from django.core.management import call_command
from django.db.models.loading import get_model
from celery.task import Task
from celery_haystack.conf import settings
try:
from haystack import connections
index_holder = connections['default'].get_unified_index()
from haystack.exceptions import NotHandled as IndexNotFoundException
legacy = False
except ImportError:
try:
from haystack import site as index_holder
from haystack.exceptions import NotRegistered as IndexNotFoundException
legacy = True
except ImportError, e:
raise ImproperlyConfigured("Haystack couldn't be imported: %s" % e)
class CeleryHaystackSignalHandler(Task):
using = settings.CELERY_HAYSTACK_DEFAULT_ALIAS
max_retries = settings.CELERY_HAYSTACK_MAX_RETRIES
default_retry_delay = settings.CELERY_HAYSTACK_RETRY_DELAY
def split_identifier(self, identifier, **kwargs):
"""
Break down the identifier representing the instance.
Converts 'notes.note.23' into ('notes.note', 23).
"""
bits = identifier.split('.')
if len(bits) < 2:
logger = self.get_logger(**kwargs)
logger.error("Unable to parse object "
"identifer '%s'. Moving on..." % identifier)
return (None, None)
pk = bits[-1]
# In case Django ever handles full paths...
object_path = '.'.join(bits[:-1])
return (object_path, pk)
def get_model_class(self, object_path, **kwargs):
"""
Fetch the model's class in a standarized way.
"""
bits = object_path.split('.')
app_name = '.'.join(bits[:-1])
classname = bits[-1]
model_class = get_model(app_name, classname)
if model_class is None:
logger = self.get_logger(**kwargs)
logger.error("Could not load model "
"from '%s'. Moving on..." % object_path)
return None
return model_class
def get_instance(self, model_class, pk, **kwargs):
"""
Fetch the instance in a standarized way.
"""
logger = self.get_logger(**kwargs)
try:
instance = model_class.objects.get(pk=int(pk))
except model_class.DoesNotExist:
logger.error("Couldn't load model instance "
"with pk #%s. Somehow it went missing?" % pk)
return None
except model_class.MultipleObjectsReturned:
logger.error("More than one object with pk #%s. Oops?" % pk)
return None
return instance
def get_index(self, model_class, **kwargs):
"""
Fetch the model's registered ``SearchIndex`` in a standarized way.
"""
logger = self.get_logger(**kwargs)
try:
return index_holder.get_index(model_class)
except IndexNotFoundException:
logger.error("Couldn't find a SearchIndex for %s." % model_class)
return None
def get_handler_options(self, **kwargs):
options = {}
if legacy:
options['using'] = self.using
return options
def run(self, action, identifier, **kwargs):
"""
Trigger the actual index handler depending on the
given action ('update' or 'delete').
"""
logger = self.get_logger(**kwargs)
# First get the object path and pk (e.g. ('notes.note', 23))
object_path, pk = self.split_identifier(identifier, **kwargs)
if object_path is None or pk is None:
logger.error("Skipping.")
return
# Then get the model class for the object path
model_class = self.get_model_class(object_path, **kwargs)
current_index = self.get_index(model_class, **kwargs)
if action == 'delete':
# If the object is gone, we'll use just the identifier against the
# index.
try:
handler_options = self.get_handler_options(**kwargs)
current_index.remove_object(identifier, **handler_options)
except Exception, exc:
logger.error(exc)
self.retry([action, identifier], kwargs, exc=exc)
else:
logger.debug("Deleted '%s' from index" % identifier)
return
elif action == 'update':
# and the instance of the model class with the pk
instance = self.get_instance(model_class, pk, **kwargs)
if instance is None:
logger.debug("Didn't update index for '%s'" % identifier)
return
# Call the appropriate handler of the current index and
# handle exception if neccessary
logger.debug("Indexing '%s'." % instance)
try:
handler_options = self.get_handler_options(**kwargs)
current_index.update_object(instance, **handler_options)
except Exception, exc:
logger.error(exc)
self.retry([action, identifier], kwargs, exc=exc)
else:
logger.debug("Updated index with '%s'" % instance)
else:
logger.error("Unrecognized action '%s'. Moving on..." % action)
self.retry([action, identifier], kwargs, exc=exc)
class CeleryHaystackUpdateIndex(Task):
"""
A celery task class to be used to call the update_index management
command from Celery.
"""
def run(self, apps=None, **kwargs):
logger = self.get_logger(**kwargs)
logger.info("Starting update index")
# Run the update_index management command
defaults = {
'batchsize': settings.CELERY_HAYSTACK_COMMAND_BATCH_SIZE,
'age': settings.CELERY_HAYSTACK_COMMAND_AGE,
'remove': settings.CELERY_HAYSTACK_COMMAND_REMOVE,
'using': settings.CELERY_HAYSTACK_DEFAULT_ALIAS,
'workers': settings.CELERY_HAYSTACK_COMMAND_WORKERS,
'verbosity': settings.CELERY_HAYSTACK_COMMAND_VERBOSITY,
}
defaults.update(kwargs)
if apps is None:
apps = settings.CELERY_HAYSTACK_COMMAND_APPS
call_command('update_index', *apps, **defaults)
logger.info("Finishing update index")

Просмотреть файл

@ -0,0 +1,20 @@
from django.core.exceptions import ImproperlyConfigured
from django.utils.importlib import import_module
from celery_haystack.conf import settings
def get_update_task(task_path=None):
import_path = task_path or settings.CELERY_HAYSTACK_DEFAULT_TASK
module, attr = import_path.rsplit('.', 1)
try:
mod = import_module(module)
except ImportError, e:
raise ImproperlyConfigured('Error importing module %s: "%s"' %
(module, e))
try:
Task = getattr(mod, attr)
except AttributeError:
raise ImproperlyConfigured('Module "%s" does not define a "%s" '
'class.' % (module, attr))
return Task()