зеркало из https://github.com/mozilla/kitsune.git
[bug 709080] Refactor ES code into SearchMixin
This reduces the triplification of everything in es_search modules. This reduces the triplification of everything in es_search modules. This reduces the triplification of everything in es_search modules. Also, I introduced ES_INDEXING_TIMEOUT, fixed documentation, and cleaned up some other things while I was poking around. Also also, I nixed all the ES mapping constants. Calling put_mapping with typos will kick up an error, so there's no need to additionally have constants around to prevent typos.
This commit is contained in:
Родитель
0dd3794295
Коммит
4a0130c0c9
|
@ -1,177 +0,0 @@
|
|||
import elasticutils
|
||||
import logging
|
||||
import pyes
|
||||
import time
|
||||
|
||||
from search.es_utils import (TYPE, INTEGER, STRING, ANALYZED, ANALYZER,
|
||||
SNOWBALL, TERM_VECTOR, YES, STORE, BOOLEAN,
|
||||
INDEX, WITH_POS_OFFSETS, DATE, get_index)
|
||||
|
||||
|
||||
log = logging.getLogger('k.forums.es_search')
|
||||
|
||||
|
||||
def setup_mapping(index):
|
||||
from forums.models import Thread
|
||||
|
||||
mapping = {
|
||||
'properties': {
|
||||
'id': {TYPE: INTEGER},
|
||||
'thread_id': {TYPE: INTEGER},
|
||||
'forum_id': {TYPE: INTEGER},
|
||||
'title': {TYPE: STRING, INDEX: ANALYZED, ANALYZER: SNOWBALL},
|
||||
'is_sticky': {TYPE: BOOLEAN},
|
||||
'is_locked': {TYPE: BOOLEAN},
|
||||
'author_id': {TYPE: INTEGER},
|
||||
'author_ord': {TYPE: STRING},
|
||||
'content': {TYPE: STRING, INDEX: ANALYZED, ANALYZER: SNOWBALL,
|
||||
STORE: YES, TERM_VECTOR: WITH_POS_OFFSETS},
|
||||
'created': {TYPE: DATE},
|
||||
'updated': {TYPE: DATE},
|
||||
'replies': {TYPE: INTEGER}
|
||||
}
|
||||
}
|
||||
|
||||
es = elasticutils.get_es()
|
||||
|
||||
try:
|
||||
es.put_mapping(Thread._meta.db_table, mapping, index)
|
||||
except pyes.exceptions.ElasticSearchException, e:
|
||||
log.error(e)
|
||||
|
||||
|
||||
def extract_thread(thread):
|
||||
"""Extracts interesting thing from a Thread and its Posts"""
|
||||
d = {}
|
||||
d['id'] = thread.id
|
||||
d['forum_id'] = thread.forum.id
|
||||
d['title'] = thread.title
|
||||
d['is_sticky'] = thread.is_sticky
|
||||
d['is_locked'] = thread.is_locked
|
||||
d['created'] = thread.created
|
||||
|
||||
if thread.last_post is not None:
|
||||
d['updated'] = thread.last_post.created
|
||||
else:
|
||||
d['updates'] = None
|
||||
|
||||
d['replies'] = thread.replies
|
||||
|
||||
author_ids = set()
|
||||
author_ords = set()
|
||||
content = []
|
||||
|
||||
for post in thread.post_set.all():
|
||||
author_ids.add(post.author.id)
|
||||
author_ords.add(post.author.username)
|
||||
content.append(post.content)
|
||||
|
||||
d['author_id'] = list(author_ids)
|
||||
d['author_ord'] = list(author_ords)
|
||||
d['content'] = content
|
||||
|
||||
return d
|
||||
|
||||
|
||||
def index_thread(thread, bulk=False, force_insert=False, es=None,
|
||||
refresh=False):
|
||||
from forums.models import Thread
|
||||
|
||||
if es is None:
|
||||
es = elasticutils.get_es()
|
||||
|
||||
index = get_index(Thread)
|
||||
|
||||
try:
|
||||
es.index(thread, index, doc_type=Thread._meta.db_table,
|
||||
id=thread['id'], bulk=bulk, force_insert=force_insert)
|
||||
except pyes.urllib3.TimeoutError:
|
||||
# If we have a timeout, try it again rather than die. If we
|
||||
# have a second one, that will cause everything to die.
|
||||
es.index(thread, index, doc_type=Thread._meta.db_table,
|
||||
id=thread['id'], bulk=bulk, force_insert=force_insert)
|
||||
|
||||
if refresh:
|
||||
es.refresh(timesleep=0)
|
||||
|
||||
|
||||
def unindex_threads(ids):
|
||||
from forums.models import Thread
|
||||
|
||||
es = elasticutils.get_es()
|
||||
index = get_index(Thread)
|
||||
|
||||
for thread_id in ids:
|
||||
try:
|
||||
es.delete(index, doc_type=Thread._meta.db_table, id=thread_id)
|
||||
except pyes.exception.NotFoundException:
|
||||
# If the document isn't in the index, then we ignore it.
|
||||
# TODO: Is that right?
|
||||
pass
|
||||
|
||||
|
||||
def unindex_posts(ids):
|
||||
from forums.models import Post
|
||||
|
||||
for post_id in ids:
|
||||
try:
|
||||
post = Post.objects.get(post_id)
|
||||
index_thread(extract_thread(post.thread))
|
||||
except Post.ObjectNotFound:
|
||||
pass
|
||||
|
||||
|
||||
def reindex_documents(percent=100):
|
||||
"""Iterate over this to update the mapping and index all documents.
|
||||
|
||||
Yields number of documents done.
|
||||
|
||||
Note: This only gets called from the command line. Ergo we do
|
||||
some logging so the user knows what's going on.
|
||||
|
||||
:arg percent: The percentage of questions to index. Defaults to
|
||||
100--e.g. all of them.
|
||||
|
||||
"""
|
||||
from forums.models import Thread
|
||||
from django.conf import settings
|
||||
|
||||
index = get_index(Thread)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
log.info('reindex threads: %s %s', index, Thread._meta.db_table)
|
||||
|
||||
es = pyes.ES(settings.ES_HOSTS, timeout=10.0)
|
||||
|
||||
log.info('setting up mapping....')
|
||||
setup_mapping(index)
|
||||
|
||||
log.info('iterating through threads....')
|
||||
total = Thread.objects.count()
|
||||
to_index = int(total * (percent / 100.0))
|
||||
log.info('total threads: %s (to be indexed %s)', total, to_index)
|
||||
total = to_index
|
||||
|
||||
t = 0
|
||||
for thread in Thread.objects.order_by('id').all():
|
||||
t += 1
|
||||
if t % 1000 == 0:
|
||||
time_to_go = (total - t) * ((time.time() - start_time) / t)
|
||||
if time_to_go < 60:
|
||||
time_to_go = "%d secs" % time_to_go
|
||||
else:
|
||||
time_to_go = "%d min" % (time_to_go / 60)
|
||||
|
||||
log.info('%s/%s... (%s to go)', t, total, time_to_go)
|
||||
es.flush_bulk(forced=True)
|
||||
|
||||
if t > total:
|
||||
break
|
||||
|
||||
index_thread(extract_thread(thread), bulk=True, es=es)
|
||||
yield t
|
||||
|
||||
es.flush_bulk(forced=True)
|
||||
log.info('done!')
|
||||
es.refresh()
|
|
@ -1,7 +1,6 @@
|
|||
import datetime
|
||||
|
||||
from django.db import models
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import User
|
||||
from django.db.models.signals import post_save, pre_delete
|
||||
from django.dispatch import receiver
|
||||
|
@ -15,7 +14,7 @@ from sumo.helpers import urlparams, wiki_to_html
|
|||
from sumo.urlresolvers import reverse
|
||||
from sumo.models import ModelBase
|
||||
from search import searcher
|
||||
from search import es_utils
|
||||
from search.models import SearchMixin
|
||||
from search.utils import crc32
|
||||
import waffle
|
||||
|
||||
|
@ -94,7 +93,7 @@ class Forum(NotificationsMixin, ModelBase):
|
|||
self.last_post = _last_post_from(posts, exclude_post=exclude_post)
|
||||
|
||||
|
||||
class Thread(NotificationsMixin, ModelBase):
|
||||
class Thread(NotificationsMixin, ModelBase, SearchMixin):
|
||||
title = models.CharField(max_length=255)
|
||||
forum = models.ForeignKey('Forum')
|
||||
created = models.DateTimeField(default=datetime.datetime.now,
|
||||
|
@ -180,27 +179,85 @@ class Thread(NotificationsMixin, ModelBase):
|
|||
# If self.last_post is None, and this was called from Post.delete,
|
||||
# then Post.delete will erase the thread, as well.
|
||||
|
||||
@classmethod
|
||||
def get_mapping(cls):
|
||||
mapping = {
|
||||
'properties': {
|
||||
'id': {'type': 'integer'},
|
||||
'thread_id': {'type': 'integer'},
|
||||
'forum_id': {'type': 'integer'},
|
||||
'title': {'type': 'string', 'analyzer': 'snowball'},
|
||||
'is_sticky': {'type': 'boolean'},
|
||||
'is_locked': {'type': 'boolean'},
|
||||
'author_id': {'type': 'integer'},
|
||||
'author_ord': {'type': 'string'},
|
||||
'content': {'type': 'string', 'analyzer': 'snowball',
|
||||
'store': 'yes',
|
||||
'term_vector': 'with_positions_offsets'},
|
||||
'created': {'type': 'date'},
|
||||
'updated': {'type': 'date'},
|
||||
'replies': {'type': 'integer'}
|
||||
}
|
||||
}
|
||||
return mapping
|
||||
|
||||
@receiver(post_save, sender=Thread,
|
||||
dispatch_uid='forums.search.index.thread.save')
|
||||
def update_thread_in_index(sender, instance, **kw):
|
||||
# raw is True when saving a model exactly as presented--like when
|
||||
# loading fixtures. In this case we don't want to trigger.
|
||||
if not settings.ES_LIVE_INDEXING or kw.get('raw'):
|
||||
return
|
||||
def extract_document(self):
|
||||
"""Extracts interesting thing from a Thread and its Posts"""
|
||||
d = {}
|
||||
d['id'] = self.id
|
||||
d['forum_id'] = self.forum.id
|
||||
d['title'] = self.title
|
||||
d['is_sticky'] = self.is_sticky
|
||||
d['is_locked'] = self.is_locked
|
||||
d['created'] = self.created
|
||||
|
||||
from forums.tasks import index_threads
|
||||
es_utils.add_index_task(index_threads.delay, (instance.id,))
|
||||
if self.last_post is not None:
|
||||
d['updated'] = self.last_post.created
|
||||
else:
|
||||
d['updates'] = None
|
||||
|
||||
d['replies'] = self.replies
|
||||
|
||||
author_ids = set()
|
||||
author_ords = set()
|
||||
content = []
|
||||
|
||||
for post in self.post_set.all():
|
||||
author_ids.add(post.author.id)
|
||||
author_ords.add(post.author.username)
|
||||
content.append(post.content)
|
||||
|
||||
d['author_id'] = list(author_ids)
|
||||
d['author_ord'] = list(author_ords)
|
||||
d['content'] = content
|
||||
|
||||
return d
|
||||
|
||||
|
||||
@receiver(pre_delete, sender=Thread,
|
||||
dispatch_uid='forums.search.index.thread.delete')
|
||||
def remove_thread_from_index(sender, instance, **kw):
|
||||
if not settings.ES_LIVE_INDEXING:
|
||||
return
|
||||
# Register this as a model we index in ES.
|
||||
Thread.register_search_model()
|
||||
|
||||
from forums.tasks import unindex_threads
|
||||
unindex_threads([instance.id])
|
||||
|
||||
def _update_t_index(sender, instance, **kw):
|
||||
"""Given a Thread, creates an index task"""
|
||||
if not kw.get('raw'):
|
||||
obj = instance
|
||||
obj.__class__.add_index_task((obj.id,))
|
||||
|
||||
|
||||
def _remove_t_index(sender, instance, **kw):
|
||||
"""Given a Thread, create an unindex task"""
|
||||
if not kw.get('raw'):
|
||||
obj = instance
|
||||
obj.__class__.add_unindex_task((obj.id,))
|
||||
|
||||
|
||||
f_t_es_post_save = receiver(
|
||||
post_save, sender=Thread,
|
||||
dispatch_uid='f.t.es.post_save')(_update_t_index)
|
||||
f_t_es_pre_delete = receiver(
|
||||
pre_delete, sender=Thread,
|
||||
dispatch_uid='f.t.es.pre_delete')(_remove_t_index)
|
||||
|
||||
|
||||
class Post(ActionMixin, ModelBase):
|
||||
|
@ -285,26 +342,19 @@ class Post(ActionMixin, ModelBase):
|
|||
return wiki_to_html(self.content)
|
||||
|
||||
|
||||
@receiver(post_save, sender=Post,
|
||||
dispatch_uid='forums.search.index.post.save')
|
||||
def update_post_in_index(sender, instance, **kw):
|
||||
# raw is True when saving a model exactly as presented--like when
|
||||
# loading fixtures. In this case we don't want to trigger.
|
||||
if not settings.ES_LIVE_INDEXING or kw.get('raw'):
|
||||
return
|
||||
|
||||
from forums.tasks import index_threads
|
||||
es_utils.add_index_task(index_threads.delay, (instance.thread_id,))
|
||||
def _update_post_index(sender, instance, **kw):
|
||||
"""Given a Post, update the Thread in the index"""
|
||||
if not kw.get('raw'):
|
||||
obj = instance.thread
|
||||
obj.__class__.add_index_task((obj.id,))
|
||||
|
||||
|
||||
@receiver(pre_delete, sender=Post,
|
||||
dispatch_uid='forums.search.index.post.delete')
|
||||
def remove_post_from_index(sender, instance, **kw):
|
||||
if not settings.ES_LIVE_INDEXING:
|
||||
return
|
||||
|
||||
from forums.tasks import index_threads
|
||||
es_utils.add_index_task(index_threads.delay, (instance.thread_id,))
|
||||
f_p_es_post_save = receiver(
|
||||
post_save, sender=Post,
|
||||
dispatch_uid='f_p_es_post_save')(_update_post_index)
|
||||
f_p_es_pre_delete = receiver(
|
||||
pre_delete, sender=Post,
|
||||
dispatch_uid='f_p_es_pre_delete')(_update_post_index)
|
||||
|
||||
|
||||
def discussion_searcher(request):
|
||||
|
|
|
@ -1,215 +0,0 @@
|
|||
import elasticutils
|
||||
import logging
|
||||
import pyes
|
||||
import time
|
||||
|
||||
from search.es_utils import (TYPE, LONG, STRING, ANALYZER,
|
||||
SNOWBALL, TERM_VECTOR, STORE, YES, BOOLEAN,
|
||||
WITH_POS_OFFSETS, DATE, INTEGER, get_index)
|
||||
|
||||
|
||||
log = logging.getLogger('k.questions.es_search')
|
||||
|
||||
|
||||
def setup_mapping(index):
|
||||
from questions.models import Question
|
||||
|
||||
mapping = {
|
||||
'properties': {
|
||||
'id': {TYPE: LONG},
|
||||
'question_id': {TYPE: LONG},
|
||||
'title': {TYPE: STRING, ANALYZER: SNOWBALL},
|
||||
'question_content':
|
||||
{TYPE: STRING, ANALYZER: SNOWBALL,
|
||||
# TODO: Stored because originally, this is the only field we
|
||||
# were excerpting on. Standardize one way or the other.
|
||||
STORE: YES, TERM_VECTOR: WITH_POS_OFFSETS},
|
||||
'answer_content':
|
||||
{TYPE: STRING, ANALYZER: SNOWBALL},
|
||||
'replies': {TYPE: INTEGER},
|
||||
'is_solved': {TYPE: BOOLEAN},
|
||||
'is_locked': {TYPE: BOOLEAN},
|
||||
'has_answers': {TYPE: BOOLEAN},
|
||||
'has_helpful': {TYPE: BOOLEAN},
|
||||
'created': {TYPE: DATE},
|
||||
'updated': {TYPE: DATE},
|
||||
'question_creator': {TYPE: STRING},
|
||||
'answer_creator': {TYPE: STRING},
|
||||
'question_votes': {TYPE: INTEGER},
|
||||
'answer_votes': {TYPE: INTEGER},
|
||||
'tag': {TYPE: STRING}
|
||||
}
|
||||
}
|
||||
|
||||
es = elasticutils.get_es()
|
||||
|
||||
# TODO: If the mapping is there already and we do a put_mapping,
|
||||
# does that stomp on the existing mapping or raise an error?
|
||||
try:
|
||||
es.put_mapping(Question._meta.db_table, mapping, index)
|
||||
except pyes.exceptions.ElasticSearchException, e:
|
||||
log.error(e)
|
||||
|
||||
|
||||
def extract_question(question):
|
||||
"""Extracts indexable attributes from a Question and its answers."""
|
||||
question_data = {}
|
||||
|
||||
question_data['id'] = question.id
|
||||
|
||||
question_data['title'] = question.title
|
||||
question_data['question_content'] = question.content
|
||||
question_data['replies'] = question.num_answers
|
||||
question_data['is_solved'] = bool(question.solution_id)
|
||||
question_data['is_locked'] = question.is_locked
|
||||
question_data['has_answers'] = bool(question.num_answers)
|
||||
|
||||
question_data['created'] = question.created
|
||||
question_data['updated'] = question.updated
|
||||
|
||||
question_data['question_creator'] = question.creator.username
|
||||
question_data['question_votes'] = question.num_votes_past_week
|
||||
|
||||
question_data['tag'] = [tag['name'] for tag in question.tags.values()]
|
||||
|
||||
# Array of strings.
|
||||
answer_content = []
|
||||
|
||||
# has_helpful is true if at least one answer is marked as
|
||||
# helpful.
|
||||
has_helpful = False
|
||||
|
||||
# answer_creator is the set of all answer creator user names.
|
||||
answer_creator = set()
|
||||
|
||||
# answer_votes is the sum of votes for all of the answers.
|
||||
answer_votes = 0
|
||||
|
||||
for ans in question.answers.all():
|
||||
answer_content.append(ans.content)
|
||||
has_helpful = has_helpful or bool(ans.num_helpful_votes)
|
||||
answer_creator.add(ans.creator.username)
|
||||
answer_votes += ans.upvotes
|
||||
|
||||
question_data['answer_content'] = answer_content
|
||||
question_data['has_helpful'] = has_helpful
|
||||
question_data['answer_creator'] = list(answer_creator)
|
||||
question_data['answer_votes'] = answer_votes
|
||||
|
||||
return question_data
|
||||
|
||||
|
||||
def index_doc(doc, bulk=False, force_insert=False, es=None, refresh=False):
|
||||
from questions.models import Question
|
||||
|
||||
if es is None:
|
||||
es = elasticutils.get_es()
|
||||
|
||||
index = get_index(Question)
|
||||
|
||||
try:
|
||||
es.index(doc, index, doc_type=Question._meta.db_table,
|
||||
id=doc['id'], bulk=bulk, force_insert=force_insert)
|
||||
except pyes.urllib3.TimeoutError:
|
||||
# If we have a timeout, try it again rather than die. If we
|
||||
# have a second one, that will cause everything to die.
|
||||
es.index(doc, index, doc_type=Question._meta.db_table,
|
||||
id=doc['id'], bulk=bulk, force_insert=force_insert)
|
||||
|
||||
if refresh:
|
||||
es.refresh(timesleep=0)
|
||||
|
||||
|
||||
def unindex_questions(ids):
|
||||
"""Removes Questions from the index."""
|
||||
from questions.models import Question
|
||||
|
||||
es = elasticutils.get_es()
|
||||
index = get_index(Question)
|
||||
|
||||
for question_id in ids:
|
||||
# TODO wrap this in a try/except--amongst other things, this will
|
||||
# only be in the index if the Question had no Answers.
|
||||
try:
|
||||
es.delete(index, doc_type=Question._meta.db_table,
|
||||
id=question_id)
|
||||
except pyes.exceptions.NotFoundException:
|
||||
# If the document isn't in the index, then we ignore it.
|
||||
# TODO: Is that right?
|
||||
pass
|
||||
|
||||
|
||||
def unindex_answers(ids):
|
||||
"""Removes Answers from the index.
|
||||
|
||||
:arg ids: list of question ids
|
||||
|
||||
"""
|
||||
# Answers are rolled up in Question documents, so we reindex the
|
||||
# Question.
|
||||
from questions.models import Question
|
||||
|
||||
for question_id in ids:
|
||||
try:
|
||||
# TODO: test the case where we delete the question
|
||||
# twice.
|
||||
question = Question.objects.get(id=question_id)
|
||||
index_doc(extract_question(question))
|
||||
except Question.ObjectDoesNotExist:
|
||||
pass
|
||||
|
||||
|
||||
def reindex_questions(percent=100):
|
||||
"""Iterate over this to update the mapping and index all documents.
|
||||
|
||||
Yields number of documents done.
|
||||
|
||||
Note: This gets run from the command line, so we log stuff to let
|
||||
the user know what's going on.
|
||||
|
||||
:arg percent: The percentage of questions to index. Defaults to
|
||||
100--e.g. all of them.
|
||||
|
||||
"""
|
||||
from questions.models import Question
|
||||
from django.conf import settings
|
||||
|
||||
index = get_index(Question)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
log.info('reindex questions: %s %s', index,
|
||||
Question._meta.db_table)
|
||||
|
||||
es = pyes.ES(settings.ES_HOSTS, timeout=10.0)
|
||||
|
||||
log.info('setting up mapping....')
|
||||
setup_mapping(index)
|
||||
|
||||
log.info('iterating through questions....')
|
||||
total = Question.objects.count()
|
||||
to_index = int(total * (percent / 100.0))
|
||||
log.info('total questions: %s (to be indexed: %s)', total, to_index)
|
||||
total = to_index
|
||||
|
||||
t = 0
|
||||
for q in Question.objects.order_by('id').all():
|
||||
t += 1
|
||||
if t % 1000 == 0:
|
||||
time_to_go = (total - t) * ((time.time() - start_time) / t)
|
||||
if time_to_go < 60:
|
||||
time_to_go = "%d secs" % time_to_go
|
||||
else:
|
||||
time_to_go = "%d min" % (time_to_go / 60)
|
||||
log.info('%s/%s... (%s to go)', t, total, time_to_go)
|
||||
es.flush_bulk(forced=True)
|
||||
|
||||
if t > total:
|
||||
break
|
||||
|
||||
index_doc(extract_question(q), bulk=True, es=es)
|
||||
yield t
|
||||
|
||||
es.flush_bulk(forced=True)
|
||||
log.info('done!')
|
||||
es.refresh()
|
|
@ -24,9 +24,9 @@ from questions.karma_actions import (AnswerAction, FirstAnswerAction,
|
|||
SolutionAction)
|
||||
from questions.question_config import products
|
||||
from questions.tasks import (update_question_votes, update_answer_pages,
|
||||
log_answer, index_questions, unindex_questions)
|
||||
log_answer)
|
||||
from search import searcher
|
||||
from search import es_utils
|
||||
from search.models import SearchMixin
|
||||
from search.utils import crc32
|
||||
from sumo.helpers import urlparams
|
||||
from sumo.models import ModelBase
|
||||
|
@ -41,7 +41,7 @@ from upload.models import ImageAttachment
|
|||
log = logging.getLogger('k.questions')
|
||||
|
||||
|
||||
class Question(ModelBase, BigVocabTaggableMixin):
|
||||
class Question(ModelBase, BigVocabTaggableMixin, SearchMixin):
|
||||
"""A support question."""
|
||||
title = models.CharField(max_length=255)
|
||||
creator = models.ForeignKey(User, related_name='questions')
|
||||
|
@ -282,51 +282,122 @@ class Question(ModelBase, BigVocabTaggableMixin):
|
|||
cache.add(cache_key, tags)
|
||||
return tags
|
||||
|
||||
@classmethod
|
||||
def get_mapping(cls):
|
||||
mapping = {
|
||||
'properties': {
|
||||
'id': {'type': 'long'},
|
||||
'question_id': {'type': 'long'},
|
||||
'title': {'type': 'string', 'analyzer': 'snowball'},
|
||||
'question_content':
|
||||
{'type': 'string', 'analyzer': 'snowball',
|
||||
# TODO: Stored because originally, this is the
|
||||
# only field we were excerpting on. Standardize
|
||||
# one way or the other.
|
||||
'store': 'yes', 'term_vector': 'with_positions_offsets'},
|
||||
'answer_content':
|
||||
{'type': 'string', 'analyzer': 'snowball'},
|
||||
'replies': {'type': 'integer'},
|
||||
'is_solved': {'type': 'boolean'},
|
||||
'is_locked': {'type': 'boolean'},
|
||||
'has_answers': {'type': 'boolean'},
|
||||
'has_helpful': {'type': 'boolean'},
|
||||
'created': {'type': 'date'},
|
||||
'updated': {'type': 'date'},
|
||||
'question_creator': {'type': 'string'},
|
||||
'answer_creator': {'type': 'string'},
|
||||
'question_votes': {'type': 'integer'},
|
||||
'answer_votes': {'type': 'integer'},
|
||||
'tag': {'type': 'string'}
|
||||
}
|
||||
}
|
||||
return mapping
|
||||
|
||||
@receiver(post_save, sender=Question,
|
||||
dispatch_uid='questions.search.index.question.save')
|
||||
def update_question_in_index(sender, instance, **kw):
|
||||
# raw is True when saving a model exactly as presented--like when
|
||||
# loading fixtures. In this case we don't want to trigger.
|
||||
if not settings.ES_LIVE_INDEXING or kw.get('raw'):
|
||||
return
|
||||
def extract_document(self):
|
||||
"""Extracts indexable attributes from a Question and its answers."""
|
||||
d = {}
|
||||
|
||||
es_utils.add_index_task(index_questions.delay, (instance.id,))
|
||||
d['id'] = self.id
|
||||
|
||||
d['title'] = self.title
|
||||
d['question_content'] = self.content
|
||||
d['replies'] = self.num_answers
|
||||
d['is_solved'] = bool(self.solution_id)
|
||||
d['is_locked'] = self.is_locked
|
||||
d['has_answers'] = bool(self.num_answers)
|
||||
|
||||
d['created'] = self.created
|
||||
d['updated'] = self.updated
|
||||
|
||||
d['question_creator'] = self.creator.username
|
||||
d['question_votes'] = self.num_votes_past_week
|
||||
|
||||
d['tag'] = [tag['name'] for tag in self.tags.values()]
|
||||
|
||||
# Array of strings.
|
||||
answer_content = []
|
||||
|
||||
# has_helpful is true if at least one answer is marked as
|
||||
# helpful.
|
||||
has_helpful = False
|
||||
|
||||
# answer_creator is the set of all answer creator user names.
|
||||
answer_creator = set()
|
||||
|
||||
# answer_votes is the sum of votes for all of the answers.
|
||||
answer_votes = 0
|
||||
|
||||
for ans in self.answers.all():
|
||||
answer_content.append(ans.content)
|
||||
has_helpful = has_helpful or bool(ans.num_helpful_votes)
|
||||
answer_creator.add(ans.creator.username)
|
||||
answer_votes += ans.upvotes
|
||||
|
||||
d['answer_content'] = answer_content
|
||||
d['has_helpful'] = has_helpful
|
||||
d['answer_creator'] = list(answer_creator)
|
||||
d['answer_votes'] = answer_votes
|
||||
|
||||
return d
|
||||
|
||||
|
||||
@receiver(post_save, sender=TaggedItem,
|
||||
dispatch_uid='questions.search.index.tags.save')
|
||||
def update_question_tags_in_index(sender, instance, **kwargs):
|
||||
# raw is True when saving a model exactly as presented--like when
|
||||
# loading fixtures. In this case we don't want to trigger.
|
||||
if not settings.ES_LIVE_INDEXING or kwargs.get('raw'):
|
||||
return
|
||||
|
||||
es_utils.add_index_task(index_questions.delay,
|
||||
(instance.content_object.id,))
|
||||
# Register this as a model we index in ES.
|
||||
Question.register_search_model()
|
||||
|
||||
|
||||
@receiver(pre_delete, sender=Question,
|
||||
dispatch_uid='questions.search.index.question.delete')
|
||||
def remove_question_from_index(sender, instance, **kw):
|
||||
if (not settings.ES_LIVE_INDEXING or kw.get('raw') or
|
||||
not isinstance(instance.content_object, Question)):
|
||||
return
|
||||
|
||||
unindex_questions([instance.id])
|
||||
def _update_qs_index(sender, instance, **kw):
|
||||
"""Given a Question, creates an index task"""
|
||||
if not kw.get('raw'):
|
||||
obj = instance
|
||||
obj.__class__.add_index_task((obj.id,))
|
||||
|
||||
|
||||
@receiver(pre_delete, sender=TaggedItem,
|
||||
dispatch_uid='questions.search.index.tags.delete')
|
||||
def update_question_in_index_on_tags_delete(sender, instance, **kwargs):
|
||||
# raw is True when saving a model exactly as presented--like when
|
||||
# loading fixtures. In this case we don't want to trigger.
|
||||
if (not settings.ES_LIVE_INDEXING or kwargs.get('raw') or
|
||||
not isinstance(instance.content_object, Question)):
|
||||
return
|
||||
def _update_tag_index(sender, instance, **kw):
|
||||
"""Given a TaggedItem for a Question, creates an index task"""
|
||||
obj = instance.content_object
|
||||
if not kw.get('raw') and isinstance(obj, Question):
|
||||
obj.__class__.add_index_task((obj.id,))
|
||||
|
||||
es_utils.add_index_task(index_questions.delay,
|
||||
(instance.content_object.id,))
|
||||
|
||||
def _remove_qs_index(sender, instance, **kw):
|
||||
"""Given a Question, creates an unindex task"""
|
||||
if not kw.get('raw'):
|
||||
obj = instance
|
||||
obj.__class__.add_unindex_task((obj.id,))
|
||||
|
||||
|
||||
q_es_post_save = receiver(
|
||||
post_save, sender=Question,
|
||||
dispatch_uid='q.es.post_save')(_update_qs_index)
|
||||
q_es_pre_delete = receiver(
|
||||
pre_delete, sender=Question,
|
||||
dispatch_uid='q.es.pre_delete')(_remove_qs_index)
|
||||
q_tag_es_post_save = receiver(
|
||||
post_save, sender=TaggedItem,
|
||||
dispatch_uid='q.es.post_save')(_update_tag_index)
|
||||
q_tag_es_pre_delete = receiver(
|
||||
pre_delete, sender=TaggedItem,
|
||||
dispatch_uid='q.tag.es.pre_delete')(_update_tag_index)
|
||||
|
||||
|
||||
class QuestionMetaData(ModelBase):
|
||||
|
@ -524,24 +595,19 @@ post_save.connect(answer_connector, sender=Answer,
|
|||
dispatch_uid='question_answer_activity')
|
||||
|
||||
|
||||
@receiver(post_save, sender=Answer,
|
||||
dispatch_uid='questions.search.index.answer.save')
|
||||
def update_answer_in_index(sender, instance, **kw):
|
||||
# raw is True when saving a model exactly as presented--like when
|
||||
# loading fixtures. In this case we don't want to trigger.
|
||||
if not settings.ES_LIVE_INDEXING or kw.get('raw'):
|
||||
return
|
||||
|
||||
es_utils.add_index_task(index_questions.delay, (instance.question_id,))
|
||||
def _update_ans_index(sender, instance, **kw):
|
||||
"""Given an Answer for a Question, create an index task"""
|
||||
if not kw.get('raw'):
|
||||
obj = instance.question
|
||||
obj.__class__.add_index_task((obj.id,))
|
||||
|
||||
|
||||
@receiver(pre_delete, sender=Answer,
|
||||
dispatch_uid='questions.search.index.answer.delete')
|
||||
def remove_answer_from_index(sender, instance, **kw):
|
||||
if not settings.ES_LIVE_INDEXING:
|
||||
return
|
||||
|
||||
es_utils.add_index_task(index_questions.delay, (instance.question_id,))
|
||||
q_ans_es_post_save = receiver(
|
||||
post_save, sender=Answer,
|
||||
dispatch_uid='q.ans.es.post_save')(_update_ans_index)
|
||||
q_ans_es_pre_delete = receiver(
|
||||
pre_delete, sender=Answer,
|
||||
dispatch_uid='q.ans.es.pre_delete')(_update_ans_index)
|
||||
|
||||
|
||||
class QuestionVote(ModelBase):
|
||||
|
@ -570,31 +636,26 @@ class AnswerVote(ModelBase):
|
|||
VoteMetadata.objects.create(vote=self, key=key, value=value)
|
||||
|
||||
|
||||
@receiver(post_save, sender=AnswerVote,
|
||||
dispatch_uid='questions.search.index.answervote.save')
|
||||
def update_answervote_in_index(sender, instance, **kw):
|
||||
# TODO: We only need to update the helpful bit. It's possible
|
||||
# we could ignore all AnswerVotes that aren't helpful and if
|
||||
# they're marked as helpful, then update the index. Look into
|
||||
# this.
|
||||
def _update_ansv_index(sender, instance, **kw):
|
||||
"""Given an AnswerVote for an Answer for a Question, creates an
|
||||
unindex task
|
||||
|
||||
# raw is True when saving a model exactly as presented--like when
|
||||
# loading fixtures. In this case we don't want to trigger.
|
||||
if not settings.ES_LIVE_INDEXING or kw.get('raw'):
|
||||
return
|
||||
|
||||
es_utils.add_index_task(index_questions.delay, (
|
||||
instance.answer.question_id,))
|
||||
"""
|
||||
if not kw.get('raw'):
|
||||
obj = instance.answer.question
|
||||
obj.__class__.add_index_task((obj.id,))
|
||||
|
||||
|
||||
@receiver(pre_delete, sender=AnswerVote,
|
||||
dispatch_uid='questions.search.index.answervote.delete')
|
||||
def remove_answervote_from_index(sender, instance, **kw):
|
||||
if not settings.ES_LIVE_INDEXING:
|
||||
return
|
||||
|
||||
es_utils.add_index_task(index_questions.delay, (
|
||||
instance.answer.question_id,))
|
||||
# TODO: We only need to update the helpful bit. It's possible
|
||||
# we could ignore all AnswerVotes that aren't helpful and if
|
||||
# they're marked as helpful, then update the index. Look into
|
||||
# this.
|
||||
q_av_es_post_save = receiver(
|
||||
post_save, sender=AnswerVote,
|
||||
dispatch_uid='q.av.es.post_save')(_update_ansv_index)
|
||||
q_av_es_pre_delete = receiver(
|
||||
post_save, sender=AnswerVote,
|
||||
dispatch_uid='q.av.es.pre_delete')(_update_ansv_index)
|
||||
|
||||
|
||||
class VoteMetadata(ModelBase):
|
||||
|
|
|
@ -61,6 +61,10 @@ class QuestionUpdateTests(ElasticTestCase):
|
|||
eq_(elasticutils.S(Question).count(), 0)
|
||||
|
||||
def test_questions_tags(self):
|
||||
"""Make sure that adding tags to a Question causes it to
|
||||
refresh the index.
|
||||
|
||||
"""
|
||||
tag = u'hiphop'
|
||||
eq_(elasticutils.S(Question).filter(tag=tag).count(), 0)
|
||||
q = question(save=True)
|
||||
|
|
|
@ -1,13 +1,12 @@
|
|||
from itertools import chain, count, izip
|
||||
import logging
|
||||
from threading import local
|
||||
from pprint import pprint
|
||||
import time
|
||||
|
||||
import elasticutils
|
||||
from pprint import pprint
|
||||
import pyes
|
||||
|
||||
from django.conf import settings
|
||||
from django.core import signals
|
||||
|
||||
|
||||
ESTimeoutError = pyes.urllib3.TimeoutError
|
||||
|
@ -15,69 +14,7 @@ ESMaxRetryError = pyes.urllib3.MaxRetryError
|
|||
ESIndexMissingException = pyes.exceptions.IndexMissingException
|
||||
|
||||
|
||||
TYPE = 'type'
|
||||
ANALYZER = 'analyzer'
|
||||
INDEX = 'index'
|
||||
STORE = 'store'
|
||||
TERM_VECTOR = 'term_vector'
|
||||
|
||||
NOT_INDEXED = 'not_indexed'
|
||||
|
||||
LONG = 'long'
|
||||
INTEGER = 'integer'
|
||||
STRING = 'string'
|
||||
BOOLEAN = 'boolean'
|
||||
DATE = 'date'
|
||||
|
||||
ANALYZED = 'analyzed'
|
||||
NOTANALYZED = 'not_analyzed'
|
||||
|
||||
SNOWBALL = 'snowball'
|
||||
|
||||
YES = 'yes'
|
||||
|
||||
WITH_POS_OFFSETS = 'with_positions_offsets'
|
||||
|
||||
|
||||
_local_tasks = local()
|
||||
_local_tasks.es_index_task_set = set()
|
||||
|
||||
|
||||
def add_index_task(fun, *args):
|
||||
"""Adds an index task.
|
||||
|
||||
Note: args and its contents **must** be hashable.
|
||||
|
||||
:arg fun: the function to call
|
||||
:arg args: arguments to the function
|
||||
|
||||
"""
|
||||
_local_tasks.es_index_task_set.add((fun, args))
|
||||
|
||||
|
||||
def generate_tasks(**kwargs):
|
||||
"""Goes through thread local index update tasks set and generates
|
||||
celery tasks for all tasks in the set.
|
||||
|
||||
Because this works off of a set, it naturally de-dupes the tasks,
|
||||
so if four tasks get tossed into the set that are identical, we
|
||||
execute it only once.
|
||||
|
||||
"""
|
||||
lt = _local_tasks
|
||||
for fun, args in lt.es_index_task_set:
|
||||
fun(*args)
|
||||
|
||||
lt.es_index_task_set.clear()
|
||||
|
||||
|
||||
signals.request_finished.connect(generate_tasks)
|
||||
|
||||
|
||||
def get_index(model):
|
||||
"""Returns the index name for this model."""
|
||||
return (settings.ES_INDEXES.get(model._meta.db_table)
|
||||
or settings.ES_INDEXES['default'])
|
||||
log = logging.getLogger('search.es_utils')
|
||||
|
||||
|
||||
def get_doctype_stats():
|
||||
|
@ -86,30 +23,79 @@ def get_doctype_stats():
|
|||
For example:
|
||||
|
||||
>>> get_doctype_stats()
|
||||
{'questions': 1000, 'forums': 1000, 'wiki': 1000}
|
||||
{'questions_question': 14216, 'forums_thread': 419, 'wiki_document': 759}
|
||||
|
||||
:throws pyes.urllib3.MaxRetryError: if it can't connect to elasticsearch
|
||||
:throws pyes.exceptions.IndexMissingException: if the index doesn't exist
|
||||
|
||||
"""
|
||||
# TODO: We have to import these here, otherwise we have an import
|
||||
# loop es_utils -> models.py -> es_utils. This should get fixed by
|
||||
# having the models register themselves as indexable with es_utils
|
||||
# or something like that. Then es_utils won't have to explicitly
|
||||
# know about models.
|
||||
from forums.models import Thread
|
||||
from questions.models import Question
|
||||
from wiki.models import Document
|
||||
from search.models import get_search_models
|
||||
|
||||
stats = {}
|
||||
|
||||
for name, model in (('questions', Question),
|
||||
('forums', Thread),
|
||||
('wiki', Document)):
|
||||
stats[name] = elasticutils.S(model).count()
|
||||
for cls in get_search_models():
|
||||
stats[cls._meta.db_table] = elasticutils.S(cls).count()
|
||||
|
||||
return stats
|
||||
|
||||
|
||||
def reindex_model(cls, percent=100):
|
||||
"""Reindexes all the objects for a single mode.
|
||||
|
||||
Yields number of documents done.
|
||||
|
||||
Note: This gets run from the command line, so we log stuff to let
|
||||
the user know what's going on.
|
||||
|
||||
:arg cls: the model class
|
||||
:arg percent: The percentage of questions to index. Defaults to
|
||||
100--e.g. all of them.
|
||||
|
||||
"""
|
||||
doc_type = cls._meta.db_table
|
||||
index = cls._get_index()
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
log.info('reindex %s into %s index', doc_type, index)
|
||||
|
||||
es = pyes.ES(settings.ES_HOSTS, timeout=settings.ES_INDEXING_TIMEOUT)
|
||||
|
||||
log.info('setting up mapping....')
|
||||
mapping = cls.get_mapping()
|
||||
es.put_mapping(doc_type, mapping, index)
|
||||
|
||||
log.info('iterating through %s....', doc_type)
|
||||
total = cls.objects.count()
|
||||
to_index = int(total * (percent / 100.0))
|
||||
log.info('total %s: %s (to be indexed: %s)', doc_type, total, to_index)
|
||||
total = to_index
|
||||
|
||||
t = 0
|
||||
for obj in cls.objects.order_by('id').all():
|
||||
t += 1
|
||||
if t % 1000 == 0:
|
||||
time_to_go = (total - t) * ((time.time() - start_time) / t)
|
||||
if time_to_go < 60:
|
||||
time_to_go = "%d secs" % time_to_go
|
||||
else:
|
||||
time_to_go = "%d min" % (time_to_go / 60)
|
||||
log.info('%s/%s... (%s to go)', t, total, time_to_go)
|
||||
|
||||
if t % settings.ES_FLUSH_BULK_EVERY == 0:
|
||||
es.flush_bulk()
|
||||
|
||||
if t > total:
|
||||
break
|
||||
|
||||
cls.index(obj.extract_document(), bulk=True, es=es)
|
||||
yield t
|
||||
|
||||
es.flush_bulk(forced=True)
|
||||
log.info('done!')
|
||||
es.refresh()
|
||||
|
||||
|
||||
def es_reindex_with_progress(percent=100):
|
||||
"""Rebuild Elastic indexes as you iterate over yielded progress ratios.
|
||||
|
||||
|
@ -118,55 +104,43 @@ def es_reindex_with_progress(percent=100):
|
|||
development where doing a full reindex takes an hour.
|
||||
|
||||
"""
|
||||
# TODO: We have to import these here, otherwise we have an import
|
||||
# loop es_utils -> models.py -> es_utils. This should get fixed by
|
||||
# having the models register themselves as indexable with es_utils
|
||||
# or something like that. Then es_utils won't have to explicitly
|
||||
# know about models.
|
||||
import forums.es_search
|
||||
from forums.models import Thread
|
||||
import questions.es_search
|
||||
from questions.models import Question
|
||||
import wiki.es_search
|
||||
from wiki.models import Document
|
||||
from search.models import get_search_models
|
||||
|
||||
es = elasticutils.get_es()
|
||||
|
||||
# Go through and delete, then recreate the indexes.
|
||||
for index in settings.ES_INDEXES.values():
|
||||
es.delete_index_if_exists(index)
|
||||
es.create_index_if_missing(index) # Should always be missing.
|
||||
es.create_index(index)
|
||||
|
||||
search_models = get_search_models()
|
||||
|
||||
total = sum([cls.objects.count() for cls in search_models])
|
||||
|
||||
to_index = [reindex_model(cls, percent) for cls in search_models]
|
||||
|
||||
# TODO: Having the knowledge of apps' internals repeated here is lame.
|
||||
total = (Question.objects.count() +
|
||||
Thread.objects.count() +
|
||||
Document.objects.count())
|
||||
return (float(done) / total for done, _ in
|
||||
izip(count(1),
|
||||
chain(questions.es_search.reindex_questions(percent),
|
||||
wiki.es_search.reindex_documents(percent),
|
||||
forums.es_search.reindex_documents(percent))))
|
||||
izip(count(1), chain(*to_index)))
|
||||
|
||||
|
||||
def es_reindex(percent=100):
|
||||
"""Rebuild ElasticSearch indexes."""
|
||||
"""Rebuild ElasticSearch indexes"""
|
||||
[x for x in es_reindex_with_progress(percent) if False]
|
||||
|
||||
|
||||
def es_whazzup():
|
||||
"""Runs cluster_stats on the Elastic system."""
|
||||
# We create a logger because elasticutils uses it.
|
||||
logging.basicConfig()
|
||||
|
||||
"""Runs cluster_stats on the Elastic system"""
|
||||
es = elasticutils.get_es()
|
||||
|
||||
# TODO: It'd be better to show more useful information than raw
|
||||
# cluster_stats.
|
||||
try:
|
||||
pprint(es.cluster_stats())
|
||||
except pyes.urllib3.connectionpool.MaxRetryError:
|
||||
print ('ERROR: Your elasticsearch process is not running or '
|
||||
'ES_HOSTS is set wrong in your settings_local.py file.')
|
||||
log.error('Your elasticsearch process is not running or ES_HOSTS '
|
||||
'is set wrong in your settings_local.py file.')
|
||||
return
|
||||
|
||||
print 'Totals:'
|
||||
log.info('Totals:')
|
||||
for name, count in get_doctype_stats().items():
|
||||
print '* %s: %d' % (name, count)
|
||||
log.info(' * %s: %d', name, count)
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import logging
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from optparse import make_option
|
||||
from search.es_utils import es_reindex
|
||||
|
@ -10,6 +11,7 @@ class Command(BaseCommand):
|
|||
help='Reindex a percentage of things'),)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
percent = options['percent']
|
||||
if percent > 100 or percent < 1:
|
||||
raise CommandError('percent should be between 1 and 100')
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import logging
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from search.es_utils import es_whazzup
|
||||
|
||||
|
||||
|
@ -7,4 +7,5 @@ class Command(BaseCommand):
|
|||
help = 'Shows elastic stats.'
|
||||
|
||||
def handle(self, *args, **options):
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
es_whazzup()
|
||||
|
|
|
@ -1,3 +1,152 @@
|
|||
# We do this here to guarantee that es_utils gets imported and thus
|
||||
# its request_finished signal handler is registered.
|
||||
import search.es_utils
|
||||
import elasticutils
|
||||
import logging
|
||||
import pyes
|
||||
from threading import local
|
||||
|
||||
from django.conf import settings
|
||||
from django.core import signals
|
||||
|
||||
from search.tasks import index_task, unindex_task
|
||||
|
||||
log = logging.getLogger('es_search')
|
||||
|
||||
|
||||
# db_table name -> model Class for search models
|
||||
_search_models = {}
|
||||
|
||||
|
||||
def get_search_models():
|
||||
"""Returns a list of model classes"""
|
||||
# TODO: if we do weakrefs, then we should remove dead refs here.
|
||||
|
||||
values = _search_models.values()
|
||||
|
||||
# Sort to stabilize.
|
||||
values.sort(key=lambda cls: cls._meta.db_table)
|
||||
return values
|
||||
|
||||
|
||||
_local_tasks = local()
|
||||
_local_tasks.es_index_task_set = set()
|
||||
|
||||
|
||||
class SearchMixin(object):
|
||||
"""This mixin adds ES indexing support for the model.
|
||||
|
||||
When using this mixin, make sure to implement:
|
||||
|
||||
* get_mapping
|
||||
* extract_document
|
||||
|
||||
Additionally, after defining your model, register it as a
|
||||
search model::
|
||||
|
||||
MyModel.register_search_model()
|
||||
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def register_search_model(cls):
|
||||
"""Registers a model as being involved with ES indexing"""
|
||||
# TODO: Fix this to use weakrefs
|
||||
_search_models[cls._meta.db_table] = cls
|
||||
|
||||
@classmethod
|
||||
def get_mapping(self):
|
||||
"""Returns the ES mapping defition for this document type
|
||||
|
||||
This must be implemented. It should return an ES mapping.
|
||||
|
||||
For examples, see the codebase.
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def extract_document(self):
|
||||
"""Extracts the ES index document for this instance
|
||||
|
||||
This must be implemented. It should return a dict representing
|
||||
the document to be indexed.
|
||||
|
||||
For examples, see the codebase.
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def _get_index(cls):
|
||||
"""Returns the index for this class"""
|
||||
indexes = settings.ES_INDEXES
|
||||
return indexes.get(cls._meta.db_table) or indexes['default']
|
||||
|
||||
@classmethod
|
||||
def add_index_task(cls, ids):
|
||||
"""Adds an index task.
|
||||
|
||||
:arg ids: tuple of ids
|
||||
|
||||
"""
|
||||
_local_tasks.es_index_task_set.add((index_task.delay, (cls, ids)))
|
||||
|
||||
@classmethod
|
||||
def add_unindex_task(cls, ids):
|
||||
"""Creates a task to remove this document from the ES index
|
||||
|
||||
:arg ids: tuple of ids
|
||||
|
||||
"""
|
||||
_local_tasks.es_index_task_set.add((unindex_task.delay, (cls, ids)))
|
||||
|
||||
@classmethod
|
||||
def index(cls, document, bulk=False, force_insert=False, refresh=False,
|
||||
es=None):
|
||||
"""Indexes a single document"""
|
||||
if not settings.ES_LIVE_INDEXING:
|
||||
return
|
||||
|
||||
if es is None:
|
||||
es = elasticutils.get_es()
|
||||
|
||||
index = cls._get_index()
|
||||
doc_type = cls._meta.db_table
|
||||
|
||||
# TODO: handle pyes.urllib3.TimeoutErrors here.
|
||||
es.index(document, index=index, doc_type=doc_type, id=document['id'],
|
||||
bulk=bulk, force_insert=force_insert)
|
||||
|
||||
if refresh:
|
||||
es.refresh(timesleep=0)
|
||||
|
||||
@classmethod
|
||||
def unindex(cls, id):
|
||||
"""Removes a document from the index"""
|
||||
if not settings.ES_LIVE_INDEXING:
|
||||
return
|
||||
|
||||
index = cls._get_index()
|
||||
doc_type = cls._meta.db_table
|
||||
try:
|
||||
elasticutils.get_es().delete(index, doc_type, id)
|
||||
except pyes.exceptions.NotFoundException:
|
||||
# Ignore the case where we try to delete something that's
|
||||
# not there.
|
||||
pass
|
||||
|
||||
|
||||
def generate_tasks(**kwargs):
|
||||
"""Goes through thread local index update tasks set and generates
|
||||
celery tasks for all tasks in the set.
|
||||
|
||||
Because this works off of a set, it naturally de-dupes the tasks,
|
||||
so if four tasks get tossed into the set that are identical, we
|
||||
execute it only once.
|
||||
|
||||
"""
|
||||
lt = _local_tasks
|
||||
for fun, args in lt.es_index_task_set:
|
||||
fun(*args)
|
||||
|
||||
lt.es_index_task_set.clear()
|
||||
|
||||
|
||||
signals.request_finished.connect(generate_tasks)
|
||||
|
|
|
@ -52,3 +52,17 @@ def reindex_with_progress(waffle_when_done=False):
|
|||
finally:
|
||||
cache.delete(ES_REINDEX_PROGRESS)
|
||||
cache.delete(ES_WAFFLE_WHEN_DONE)
|
||||
|
||||
|
||||
@task
|
||||
def index_task(cls, ids, **kw):
|
||||
"""Indexes documents specified by cls and ids"""
|
||||
for obj in cls.uncached.filter(id__in=ids):
|
||||
cls.index(obj.extract_document(), refresh=True)
|
||||
|
||||
|
||||
@task
|
||||
def unindex_task(cls, ids, **kw):
|
||||
"""Unindexes documents specified by cls and ids"""
|
||||
for id in ids:
|
||||
cls.unindex(id)
|
||||
|
|
|
@ -2,50 +2,51 @@ import json
|
|||
|
||||
from nose.tools import eq_
|
||||
|
||||
from sumo.tests import TestCase, LocalizingClient, ElasticTestCase
|
||||
from sumo.tests import LocalizingClient, ElasticTestCase
|
||||
from sumo.urlresolvers import reverse
|
||||
|
||||
from search.models import generate_tasks
|
||||
from questions.tests import question, answer, answer_vote
|
||||
from questions.models import Question
|
||||
from wiki.tests import document, revision
|
||||
from forums.tests import thread, post
|
||||
from search import es_utils
|
||||
import mock
|
||||
|
||||
|
||||
class ElasticSearchTasksTests(TestCase):
|
||||
def test_tasks(self):
|
||||
class ElasticSearchTasksTests(ElasticTestCase):
|
||||
@mock.patch.object(Question, 'index')
|
||||
def test_tasks(self, index_fun):
|
||||
"""Tests to make sure tasks are added and run"""
|
||||
|
||||
times_run = []
|
||||
q = question()
|
||||
# Don't call self.refresh here since that calls generate_tasks().
|
||||
|
||||
def run_task(*args):
|
||||
times_run.append(1)
|
||||
eq_(index_fun.call_count, 0)
|
||||
|
||||
es_utils.add_index_task(run_task, (1,))
|
||||
q.save()
|
||||
generate_tasks()
|
||||
|
||||
eq_(len(times_run), 0)
|
||||
eq_(index_fun.call_count, 1)
|
||||
|
||||
es_utils.generate_tasks()
|
||||
|
||||
eq_(len(times_run), 1)
|
||||
|
||||
def test_tasks_squashed(self):
|
||||
@mock.patch.object(Question, 'index')
|
||||
def test_tasks_squashed(self, index_fun):
|
||||
"""Tests to make sure tasks are squashed"""
|
||||
|
||||
times_run = []
|
||||
q = question()
|
||||
# Don't call self.refresh here since that calls generate_tasks().
|
||||
|
||||
def run_task(*args):
|
||||
times_run.append(1)
|
||||
eq_(index_fun.call_count, 0)
|
||||
|
||||
es_utils.add_index_task(run_task, (1,))
|
||||
es_utils.add_index_task(run_task, (1,))
|
||||
es_utils.add_index_task(run_task, (1,))
|
||||
es_utils.add_index_task(run_task, (1,))
|
||||
q.save()
|
||||
q.save()
|
||||
q.save()
|
||||
q.save()
|
||||
|
||||
eq_(len(times_run), 0)
|
||||
eq_(index_fun.call_count, 0)
|
||||
|
||||
es_utils.generate_tasks()
|
||||
generate_tasks()
|
||||
|
||||
eq_(len(times_run), 1)
|
||||
eq_(index_fun.call_count, 1)
|
||||
|
||||
|
||||
class ElasticSearchViewTests(ElasticTestCase):
|
||||
|
|
|
@ -3,6 +3,7 @@ from django.db import models
|
|||
|
||||
import caching.base
|
||||
|
||||
|
||||
# Our apps should subclass ManagerBase instead of models.Manager or
|
||||
# caching.base.CachingManager directly.
|
||||
ManagerBase = caching.base.CachingManager
|
||||
|
|
|
@ -83,7 +83,7 @@ class ElasticTestCase(TestCase):
|
|||
# index is ready to be queried. Given that, it's almost
|
||||
# always the case that we want to run all the generated tasks,
|
||||
# then refresh.
|
||||
from search.es_utils import generate_tasks
|
||||
from search.models import generate_tasks
|
||||
generate_tasks()
|
||||
|
||||
es = get_es()
|
||||
|
|
|
@ -1,162 +0,0 @@
|
|||
import elasticutils
|
||||
import logging
|
||||
import pyes
|
||||
import time
|
||||
|
||||
from search.es_utils import (TYPE, INTEGER, STRING, INDEX, NOTANALYZED,
|
||||
ANALYZER, SNOWBALL, BOOLEAN, DATE, get_index)
|
||||
|
||||
|
||||
log = logging.getLogger('k.wiki.es_search')
|
||||
|
||||
|
||||
def setup_mapping(index):
|
||||
from wiki.models import Document
|
||||
|
||||
mapping = {
|
||||
'properties': {
|
||||
'id': {TYPE: INTEGER},
|
||||
'title': {TYPE: STRING, ANALYZER: SNOWBALL},
|
||||
'locale': {TYPE: STRING, INDEX: NOTANALYZED},
|
||||
'current': {TYPE: INTEGER},
|
||||
'parent_id': {TYPE: INTEGER},
|
||||
'content':
|
||||
{TYPE: STRING, ANALYZER: SNOWBALL},
|
||||
'category': {TYPE: INTEGER},
|
||||
'slug': {TYPE: STRING},
|
||||
'is_archived': {TYPE: BOOLEAN},
|
||||
'summary': {TYPE: STRING, ANALYZER: SNOWBALL},
|
||||
'keywords': {TYPE: STRING, ANALYZER: SNOWBALL},
|
||||
'updated': {TYPE: DATE},
|
||||
'tag': {TYPE: STRING}
|
||||
}
|
||||
}
|
||||
|
||||
es = elasticutils.get_es()
|
||||
|
||||
# TODO: If the mapping is there already and we do a put_mapping,
|
||||
# does that stomp on the existing mapping or raise an error?
|
||||
try:
|
||||
es.put_mapping(Document._meta.db_table, mapping, index)
|
||||
except pyes.exceptions.ElasticSearchException, e:
|
||||
log.error(e)
|
||||
|
||||
|
||||
def extract_document(doc):
|
||||
"""Extracts indexable attributes from a Document"""
|
||||
d = {}
|
||||
d['id'] = doc.id
|
||||
d['title'] = doc.title
|
||||
d['locale'] = doc.locale
|
||||
d['parent_id'] = doc.parent.id if doc.parent else None
|
||||
d['content'] = doc.html
|
||||
d['category'] = doc.category
|
||||
d['slug'] = doc.slug
|
||||
d['is_archived'] = doc.is_archived
|
||||
if doc.parent is None:
|
||||
d['tag'] = [tag['name'] for tag in doc.tags.values()]
|
||||
else:
|
||||
# Translations inherit tags from their parents.
|
||||
d['tag'] = [tag['name'] for tag in doc.parent.tags.values()]
|
||||
if doc.current_revision:
|
||||
d['summary'] = doc.current_revision.summary
|
||||
d['keywords'] = doc.current_revision.keywords
|
||||
d['updated'] = doc.current_revision.created
|
||||
d['current'] = doc.current_revision.id
|
||||
else:
|
||||
d['summary'] = None
|
||||
d['keywords'] = None
|
||||
d['updated'] = None
|
||||
d['current'] = None
|
||||
return d
|
||||
|
||||
|
||||
def index_doc(doc, bulk=False, force_insert=False, es=None, refresh=False):
|
||||
from wiki.models import Document
|
||||
|
||||
if es is None:
|
||||
es = elasticutils.get_es()
|
||||
|
||||
index = get_index(Document)
|
||||
|
||||
try:
|
||||
es.index(doc, index, doc_type=Document._meta.db_table,
|
||||
id=doc['id'], bulk=bulk, force_insert=force_insert)
|
||||
except pyes.urllib3.TimeoutError:
|
||||
# If we have a timeout, try it again rather than die. If we
|
||||
# have a second one, that will cause everything to die.
|
||||
es.index(doc, index, doc_type=Document._meta.db_table,
|
||||
id=doc['id'], bulk=bulk, force_insert=force_insert)
|
||||
|
||||
if refresh:
|
||||
es.refresh(timesleep=0)
|
||||
|
||||
|
||||
def unindex_documents(ids):
|
||||
from wiki.models import Document
|
||||
|
||||
es = elasticutils.get_es()
|
||||
index = get_index(Document)
|
||||
|
||||
for doc_id in ids:
|
||||
try:
|
||||
es.delete(index, doc_type=Document._meta.db_table, id=doc_id)
|
||||
except pyes.exceptions.NotFoundException:
|
||||
# If the document isn't in the index, then we ignore it.
|
||||
# TODO: Is that right?
|
||||
pass
|
||||
|
||||
|
||||
def reindex_documents(percent):
|
||||
"""Iterate over this to update the mapping and index all documents.
|
||||
|
||||
Yields number of documents done.
|
||||
|
||||
Note: This gets called from the commandline, so we do some logging
|
||||
so the user knows what's going on.
|
||||
|
||||
:arg percent: The percentage of questions to index. Defaults to
|
||||
100--e.g. all of them.
|
||||
|
||||
"""
|
||||
from wiki.models import Document
|
||||
from django.conf import settings
|
||||
|
||||
index = get_index(Document)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
log.info('reindex documents: %s %s', index, Document._meta.db_table)
|
||||
|
||||
es = pyes.ES(settings.ES_HOSTS, timeout=10.0)
|
||||
|
||||
log.info('setting up mapping....')
|
||||
setup_mapping(index)
|
||||
|
||||
log.info('iterating through documents....')
|
||||
total = Document.objects.count()
|
||||
to_index = int(total * (percent / 100.0))
|
||||
log.info('total documents: %s (to be indexed: %s)', total, to_index)
|
||||
total = to_index
|
||||
|
||||
t = 0
|
||||
for d in Document.objects.order_by('id').all():
|
||||
t += 1
|
||||
if t % 1000 == 0:
|
||||
time_to_go = (total - t) * ((time.time() - start_time) / t)
|
||||
if time_to_go < 60:
|
||||
time_to_go = "%d secs" % time_to_go
|
||||
else:
|
||||
time_to_go = "%d min" % (time_to_go / 60)
|
||||
log.info('%s/%s... (%s to go)', t, total, time_to_go)
|
||||
es.flush_bulk(forced=True)
|
||||
|
||||
if t > total:
|
||||
break
|
||||
|
||||
index_doc(extract_document(d), bulk=True, es=es)
|
||||
yield t
|
||||
|
||||
es.flush_bulk(forced=True)
|
||||
log.info('done!')
|
||||
es.refresh()
|
|
@ -18,7 +18,7 @@ from tidings.models import NotificationsMixin
|
|||
from tower import ugettext_lazy as _lazy, ugettext as _
|
||||
|
||||
from search import searcher
|
||||
from search import es_utils
|
||||
from search.models import SearchMixin
|
||||
from search.utils import crc32
|
||||
from sumo import ProgrammingError
|
||||
from sumo_locales import LOCALES
|
||||
|
@ -188,7 +188,8 @@ class _NotDocumentView(Exception):
|
|||
"""A URL not pointing to the document view was passed to from_url()."""
|
||||
|
||||
|
||||
class Document(NotificationsMixin, ModelBase, BigVocabTaggableMixin):
|
||||
class Document(NotificationsMixin, ModelBase, BigVocabTaggableMixin,
|
||||
SearchMixin):
|
||||
"""A localized knowledgebase document, not revision-specific."""
|
||||
title = models.CharField(max_length=255, db_index=True)
|
||||
slug = models.CharField(max_length=255, db_index=True)
|
||||
|
@ -627,55 +628,93 @@ class Document(NotificationsMixin, ModelBase, BigVocabTaggableMixin):
|
|||
from wiki.events import EditDocumentEvent
|
||||
return EditDocumentEvent.is_notifying(user, self)
|
||||
|
||||
@classmethod
|
||||
def get_mapping(cls):
|
||||
mapping = {
|
||||
'properties': {
|
||||
'id': {'type': 'integer'},
|
||||
'title': {'type': 'string', 'analyzer': 'snowball'},
|
||||
'locale': {'type': 'string', 'index': 'not_analyzed'},
|
||||
'current': {'type': 'integer'},
|
||||
'parent_id': {'type': 'integer'},
|
||||
'content':
|
||||
{'type': 'string', 'analyzer': 'snowball'},
|
||||
'category': {'type': 'integer'},
|
||||
'slug': {'type': 'string'},
|
||||
'is_archived': {'type': 'boolean'},
|
||||
'summary': {'type': 'string', 'analyzer': 'snowball'},
|
||||
'keywords': {'type': 'string', 'analyzer': 'snowball'},
|
||||
'updated': {'type': 'date'},
|
||||
'tag': {'type': 'string'}
|
||||
}
|
||||
}
|
||||
return mapping
|
||||
|
||||
@receiver(post_save, sender=Document,
|
||||
dispatch_uid='wiki.search.index.document.save')
|
||||
def update_document_from_index(sender, instance, **kw):
|
||||
# raw is True when saving a model exactly as presented--like when
|
||||
# loading fixtures. In this case we don't want to trigger.
|
||||
if not settings.ES_LIVE_INDEXING or kw.get('raw'):
|
||||
return
|
||||
|
||||
from wiki.tasks import index_documents
|
||||
es_utils.add_index_task(index_documents.delay, (instance.id,))
|
||||
def extract_document(self):
|
||||
d = {}
|
||||
d['id'] = self.id
|
||||
d['title'] = self.title
|
||||
d['locale'] = self.locale
|
||||
d['parent_id'] = self.parent.id if self.parent else None
|
||||
d['content'] = self.html
|
||||
d['category'] = self.category
|
||||
d['slug'] = self.slug
|
||||
d['is_archived'] = self.is_archived
|
||||
if self.parent is None:
|
||||
d['tag'] = [tag['name'] for tag in self.tags.values()]
|
||||
else:
|
||||
# Translations inherit tags from their parents.
|
||||
d['tag'] = [tag['name'] for tag in self.parent.tags.values()]
|
||||
if self.current_revision:
|
||||
d['summary'] = self.current_revision.summary
|
||||
d['keywords'] = self.current_revision.keywords
|
||||
d['updated'] = self.current_revision.created
|
||||
d['current'] = self.current_revision.id
|
||||
else:
|
||||
d['summary'] = None
|
||||
d['keywords'] = None
|
||||
d['updated'] = None
|
||||
d['current'] = None
|
||||
return d
|
||||
|
||||
|
||||
@receiver(post_save, sender=TaggedItem,
|
||||
dispatch_uid='wiki.search.index.tags.save')
|
||||
def update_wiki_tags_in_index(sender, instance, **kwargs):
|
||||
# raw is True when saving a model exactly as presented--like when
|
||||
# loading fixtures. In this case we don't want to trigger.
|
||||
if (not settings.ES_LIVE_INDEXING or kwargs.get('raw') or
|
||||
not isinstance(instance.content_object, Document)):
|
||||
return
|
||||
|
||||
from wiki.tasks import index_documents
|
||||
es_utils.add_index_task(index_documents.delay,
|
||||
(instance.content_object.id,))
|
||||
# Register this as a model we index in ES.
|
||||
Document.register_search_model()
|
||||
|
||||
|
||||
@receiver(pre_delete, sender=Document,
|
||||
dispatch_uid='wiki.search.index.document.delete')
|
||||
def remove_document_from_index(sender, instance, **kw):
|
||||
if not settings.ES_LIVE_INDEXING:
|
||||
return
|
||||
|
||||
from wiki.tasks import unindex_documents
|
||||
unindex_documents([instance.id])
|
||||
def _update_w_index(sender, instance, **kw):
|
||||
"""Given a Document, creates an index task"""
|
||||
if not kw.get('raw'):
|
||||
obj = instance
|
||||
obj.__class__.add_index_task((obj.id,))
|
||||
|
||||
|
||||
@receiver(pre_delete, sender=TaggedItem,
|
||||
dispatch_uid='wiki.search.index.tags.delete')
|
||||
def update_wiki_in_index_on_tags_delete(sender, instance, **kwargs):
|
||||
# raw is True when saving a model exactly as presented--like when
|
||||
# loading fixtures. In this case we don't want to trigger.
|
||||
if (not settings.ES_LIVE_INDEXING or kwargs.get('raw') or
|
||||
not isinstance(instance.content_object, Document)):
|
||||
return
|
||||
def _remove_w_index(sender, instance, **kw):
|
||||
"""Given a Document, create an unindex task"""
|
||||
if not kw.get('raw'):
|
||||
obj = instance
|
||||
obj.__class__.add_unindex_task((obj.id,))
|
||||
|
||||
from wiki.tasks import index_documents
|
||||
es_utils.add_index_task(index_documents.delay,
|
||||
(instance.content_object.id,))
|
||||
|
||||
def _update_tag_index(sender, instance, **kw):
|
||||
"""Given a TaggedItem for a Document, creates an index task"""
|
||||
obj = instance.content_object
|
||||
if not kw.get('raw') and isinstance(obj, Document):
|
||||
obj.__class__.add_index_task((obj.id,))
|
||||
|
||||
|
||||
w_es_post_save = receiver(
|
||||
post_save, sender=Document,
|
||||
dispatch_uid='w.es.post_save')(_update_w_index)
|
||||
w_es_pre_delete = receiver(
|
||||
pre_delete, sender=Document,
|
||||
dispatch_uid='w.es.pre_delete')(_remove_w_index)
|
||||
w_tag_post_save = receiver(
|
||||
post_save, sender=TaggedItem,
|
||||
dispatch_uid='w.tag.es.post_save')(_update_tag_index)
|
||||
w_tag_pre_delete = receiver(
|
||||
pre_delete, sender=TaggedItem,
|
||||
dispatch_uid='w.tag.es.pre_delete')(_update_tag_index)
|
||||
|
||||
|
||||
class Revision(ModelBase):
|
||||
|
|
|
@ -2,7 +2,6 @@ import elasticutils
|
|||
from nose.tools import eq_
|
||||
|
||||
from sumo.tests import ElasticTestCase
|
||||
from wiki.es_search import extract_document
|
||||
from wiki.tests import document
|
||||
from wiki.models import Document
|
||||
|
||||
|
@ -31,14 +30,18 @@ class TestPostUpdate(ElasticTestCase):
|
|||
doc2.tags.add(u'badtag')
|
||||
|
||||
# Verify the parent has the right tags.
|
||||
doc_dict = extract_document(doc1)
|
||||
doc_dict = doc1.extract_document()
|
||||
eq_(doc_dict['tag'], [u'desktop', u'windows'])
|
||||
|
||||
# Verify the translation has the parent's tags.
|
||||
doc_dict = extract_document(doc2)
|
||||
doc_dict = doc2.extract_document()
|
||||
eq_(doc_dict['tag'], [u'desktop', u'windows'])
|
||||
|
||||
def test_wiki_tags(self):
|
||||
"""Make sure that adding tags to a Document causes it to
|
||||
refresh the index.
|
||||
|
||||
"""
|
||||
tag = u'hiphop'
|
||||
eq_(elasticutils.S(Document).filter(tag=tag).count(), 0)
|
||||
doc = document(save=True)
|
||||
|
|
|
@ -166,22 +166,46 @@ override in ``settings_local.py``::
|
|||
``ELASTICDIR/config/elasticsearch.yml``. So if you change it in
|
||||
one place, you must also change it in the other.
|
||||
|
||||
You can also set ``USE_ELASTIC`` in your ``settings_local.py`` file.
|
||||
This affects whether Kitsune does Elastic indexing when data changes
|
||||
in the ``post_save`` and ``pre_delete`` hooks. For tests,
|
||||
``USE_ELASTIC`` is set to ``False`` except for Elastic specific tests.
|
||||
|
||||
There are a few other settings you can set in your settings_local.py
|
||||
There are a few other settings you can set in your ``settings_local.py``
|
||||
file that override Elastic Utils defaults. See `the Elastic Utils
|
||||
docs <http://elasticutils.readthedocs.org/en/latest/installation.html#configure>`_
|
||||
for details.
|
||||
|
||||
.. Note::
|
||||
Other things you can change:
|
||||
|
||||
One problem I have on my machine is that it takes a while for
|
||||
Elastic to do stuff. ``ES_TIMEOUT`` defaults to 1, but I set it to
|
||||
2 in my ``settings_local.py`` file which reduces the number of
|
||||
timeout errors I get.
|
||||
``ES_LIVE_INDEXING``
|
||||
|
||||
You can also set ``ES_LIVE_INDEXING`` in your
|
||||
``settings_local.py`` file. This affects whether Kitsune does
|
||||
Elastic indexing when data changes in the ``post_save`` and
|
||||
``pre_delete`` hooks.
|
||||
|
||||
For tests, ``ES_LIVE_INDEXING`` is set to ``False`` except for
|
||||
Elastic specific tests so we're not spending a ton of time
|
||||
indexing things we're not using.
|
||||
|
||||
``ES_FLUSH_BULK_EVERY``
|
||||
|
||||
We do bulk indexing meaning we queue up a bunch and then push them
|
||||
through all at the same time. This requires memory to queue them,
|
||||
so if you've got low memory, dropping this value to something
|
||||
lower (but still greater than 1) could help.
|
||||
|
||||
``ES_TIMEOUT``
|
||||
|
||||
This affects timeouts for search-related requests.
|
||||
|
||||
If you're having problems with ES being slow, raising this number
|
||||
can be helpful.
|
||||
|
||||
``ES_INDEXING_TIMEOUT``
|
||||
|
||||
This affects all index-related operations including creating
|
||||
indexes, deleting indexes, creating mappings, indexing documents
|
||||
and calling flush_bulk.
|
||||
|
||||
If you're having problems with indexing operations timing out,
|
||||
raising this number can sometimes help.
|
||||
|
||||
|
||||
Using Elastic Search
|
||||
|
|
|
@ -587,9 +587,11 @@ SESSION_EXISTS_COOKIE = 'sumo_session'
|
|||
ES_HOSTS = ['127.0.0.1:9200']
|
||||
ES_INDEXES = {'default': 'sumo'}
|
||||
ES_LIVE_INDEXING = False # Keep indexes up to date as objects are made/deleted
|
||||
ES_TIMEOUT = 5 # 5 second timeouts for querying/indexing
|
||||
ES_TIMEOUT = 5 # 5 second timeouts for querying
|
||||
ES_INDEXING_TIMEOUT = 30 # 30 second timeouts for all things indexing
|
||||
# Seconds between updating admin progress bar:
|
||||
ES_REINDEX_PROGRESS_BAR_INTERVAL = 5
|
||||
ES_FLUSH_BULK_EVERY = 1000
|
||||
|
||||
#
|
||||
# Connection information for Sphinx search
|
||||
|
|
Загрузка…
Ссылка в новой задаче