Issue 102. Fix django tasks code

This fixes the elasticutils.contrib.django.tasks code, updates the
docs, and adds tests so if it fails in the future, I'll know.
This commit is contained in:
Will Kahn-Greene 2013-03-18 18:25:17 -04:00
Родитель 0b49f9cf09
Коммит 6bd2c409df
4 изменённых файлов: 111 добавлений и 18 удалений

Просмотреть файл

@ -3,38 +3,64 @@ import logging
from django.conf import settings
from celery.decorators import task
from elasticutils.contrib.django import get_es
log = logging.getLogger('elasticutils')
@task
def index_objects(model, ids, **kw):
"""Models can asynchronously update their ES index.
def index_objects(mapping_type, ids, **kw):
"""Index documents of a specified mapping type.
If a model extends SearchMixin, it can add a post_save hook like so::
This allows for asynchronous indexing.
If a mapping_type extends Indexable, you can add a ``post_save``
hook for the model that it's based on like this::
@receiver(dbsignals.post_save, sender=MyModel)
def update_search_index(sender, instance, **kw):
def update_in_index(sender, instance, **kw):
from elasticutils.contrib.django import tasks
tasks.index_objects.delay(sender, [instance.id])
tasks.index_objects.delay(MyMappingType, [instance.id])
"""
if settings.ES_DISABLED:
return
es = get_es()
log.info('Indexing objects %s-%s. [%s]' % (ids[0], ids[-1], len(ids)))
qs = model.objects.filter(id__in=ids)
for item in qs:
model.index(item.fields(), bulk=True, id=item.id)
es.flush_bulk(forced=True)
log.debug('Indexing objects {0}-{1}. [{2}]'.format(
ids[0], ids[-1], len(ids)))
# Get the model this mapping type is based on.
model = mapping_type.get_model()
# Retrieve all the objects that we're going to index and do it in
# bulk.
documents = []
for obj in model.objects.filter(id__in=ids):
try:
documents.append(mapping_type.extract_document(obj.id, obj))
except Exception as exc:
print 'GAH!', repr(exc)
log.exception('Unable to extract document {0}'.format(obj))
mapping_type.bulk_index(documents, id_field='id')
@task
def unindex_objects(model, ids, **kw):
def unindex_objects(mapping_type, ids, **kw):
"""Remove documents of a specified mapping_type from the index.
This allows for asynchronous deleting.
If a mapping_type extends Indexable, you can add a ``pre_delete``
hook for the model that it's based on like this::
@receiver(dbsignals.pre_delete, sender=MyModel)
def remove_from_index(sender, instance, **kw):
from elasticutils.contrib.django import tasks
tasks.unindex_objects.delay(MyMappingType, [instance.id])
"""
if settings.ES_DISABLED:
return
for id in ids:
log.info('Removing object [%s.%d] from search index.' % (model, id))
elasticutils.get_es().delete(model._get_index(), model._meta.db_table, id)
for id_ in ids:
mapping_type.unindex(id_)

Просмотреть файл

@ -28,6 +28,7 @@ class FakeModel(object):
objects = Manager()
def __init__(self, **kw):
self._doc = kw
for key in kw:
setattr(self, key, kw[key])
_model_cache.append(self)
@ -37,3 +38,11 @@ class FakeDjangoMappingType(DjangoMappingType, Indexable):
@classmethod
def get_model(cls):
return FakeModel
@classmethod
def extract_document(cls, obj_id, obj=None):
if obj is None:
raise ValueError('I\'m a dumb mock object and I have no idea '
'what to do with these args.')
return obj._doc

Просмотреть файл

@ -23,8 +23,10 @@ try:
from elasticutils.contrib.django import (
S, F, get_es, InvalidFieldActionError)
from elasticutils.contrib.django.tasks import (
index_objects, unindex_objects)
from elasticutils.tests.django_utils import (
FakeDjangoMappingType, FakeModel)
FakeDjangoMappingType, FakeModel, reset_model_cache)
except ImportError:
SKIP_TESTS = True
@ -313,3 +315,56 @@ class IndexableTest(DjangoElasticTestCase):
# Query it to make sure they're there.
eq_(len(S(FakeDjangoMappingType).query(name__prefix='odin')), 1)
eq_(len(S(FakeDjangoMappingType).query(name__prefix='erik')), 1)
def require_celery_or_skip(fun):
@wraps(fun)
def _require_celery_or_skip(*args, **kwargs):
try:
import celery
except ImportError:
raise SkipTest
return fun(*args, **kwargs)
return _require_celery_or_skip
class TestTasks(DjangoElasticTestCase):
index_name = 'elasticutilstest'
@classmethod
def get_es(cls):
return get_es()
def setUp(self):
super(TestTasks, self).setUp()
if self.skip_tests or SKIP_TESTS:
return
TestTasks.create_index()
reset_model_cache()
def tearDown(self):
super(TestTasks, self).tearDown()
if self.skip_tests or SKIP_TESTS:
return
TestTasks.cleanup_index()
@require_celery_or_skip
def test_tasks(self):
documents = [
{'id': 1, 'name': 'odin skullcrusher'},
{'id': 2, 'name': 'heimdall kneebiter'},
{'id': 3, 'name': 'erik rose'}
]
for doc in documents:
FakeModel(**doc)
# Test index_objects task
index_objects(FakeDjangoMappingType, [1, 2, 3])
FakeDjangoMappingType.refresh_index()
eq_(FakeDjangoMappingType.search().count(), 3)
# Test unindex_objects task
unindex_objects(FakeDjangoMappingType, [1, 2, 3])
FakeDjangoMappingType.refresh_index()
eq_(FakeDjangoMappingType.search().count(), 0)

Просмотреть файл

@ -1,3 +1,6 @@
ES_URLS = ['http://localhost:9200']
ES_INDEXES = {'default': ['elasticutilstest']}
ES_TIMEOUT = 10
ES_DISABLED = False
CELERY_ALWAYS_EAGER = True