only index into ES explicitly and do so in a "queue", for addons only initially (bug 749696)

This commit is contained in:
Andy McKay 2012-09-28 15:27:56 -07:00
Родитель 058f8c5a06
Коммит 980a44b81e
8 изменённых файлов: 154 добавлений и 0 удалений

Просмотреть файл

@ -13,6 +13,7 @@ import amo
from amo.decorators import set_modified_on, write
from amo.utils import (attach_trans_dict, cache_ns_key, sorted_groupby,
ImageCheck)
from lib.es.hold import add, process
from market.models import AddonPremium
from tags.models import Tag
from versions.models import Version
@ -97,6 +98,17 @@ def delete_preview_files(id, **kw):
@task
def index_addons(ids, **kw):
# For the moment, only do this in the test suite.
if settings.IN_TEST_SUITE:
for pk in ids:
add(index_addon_callback, pk)
else:
# We don't do a lot of multiple calls on requests, so let's just call
# that normally.
index_addon_callback(ids)
def index_addon_callback(ids):
es = elasticutils.get_es()
log.info('Indexing addons %s-%s. [%s]' % (ids[0], ids[-1], len(ids)))
qs = Addon.uncached.filter(id__in=ids)

Просмотреть файл

@ -35,6 +35,7 @@ from applications.models import Application, AppVersion
from bandwagon.models import Collection
from files.helpers import copyfileobj
from files.models import File, Platform
from lib.es.signals import reset, process
from market.models import AddonPremium, Price, PriceCurrency
from versions.models import ApplicationsVersions, Version
@ -212,6 +213,7 @@ class TestCase(RedisTest, test_utils.TestCase):
def setUpClass(cls):
if cls.mock_es:
[p.start() for p in ES_patchers]
reset.send(None) # Reset all the ES tasks on hold.
super(TestCase, cls).setUpClass()
@classmethod
@ -603,8 +605,14 @@ class ESTestCase(TestCase):
cls.add_addons()
cls.refresh()
@classmethod
def send(cls):
# Send all the ES tasks on hold.
process.send(None)
@classmethod
def refresh(cls, index='default', timesleep=0):
process.send(None)
cls.es.refresh(settings.ES_INDEXES[index], timesleep=timesleep)
@classmethod

0
lib/es/__init__.py Normal file
Просмотреть файл

9
lib/es/context.py Normal file
Просмотреть файл

@ -0,0 +1,9 @@
from contextlib import contextmanager
from signals import process
@contextmanager
def send():
yield
process.send(None)

12
lib/es/decorators.py Normal file
Просмотреть файл

@ -0,0 +1,12 @@
import functools
from signals import process
def send(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
res = func(*args, **kwargs)
process.send(None)
return res
return wrapper

49
lib/es/hold.py Normal file
Просмотреть файл

@ -0,0 +1,49 @@
from threading import local
import signals
_locals = local()
# This is heavily based on https://github.com/mozilla/kitsune/commit/85936b
# With a few tweaks.
def setup():
# Add in the tasks object. Will return True if it was created.
if not hasattr(_locals, 'tasks'):
_locals.tasks = set()
return True
def add(fun, pk):
# By using a set, we ensure that the pk is only added once per func.
setup()
_locals.tasks.add((fun, pk))
def reset(**kwargs):
setup()
_locals.tasks.clear()
def process(**kwargs):
# This will uniquify the tasks even more so that we only call each
# index method once, for all the ids added to the list.
#
# This requires there to be a uniquely named index method that uses
# this holding system.
if setup():
return
uniq = {}
for fun, pk in _locals.tasks:
uniq.setdefault(fun.__name__, {'func': fun, 'ids': []})
uniq[fun.__name__]['ids'].append(pk)
for v in uniq.values():
v['func'](v['ids'])
_locals.tasks.clear()
signals.reset.connect(reset, dispatch_uid='reset_es_tasks')
signals.process.connect(process, dispatch_uid='process_es_tasks')

6
lib/es/signals.py Normal file
Просмотреть файл

@ -0,0 +1,6 @@
from django.dispatch import Signal
# This resets the messages that were going to go to ES.
reset = Signal(providing_args=[])
# This sends all the messages to ES and then resets the queue.
process = Signal(providing_args=[])

58
lib/es/tests.py Normal file
Просмотреть файл

@ -0,0 +1,58 @@
import mock
from nose.tools import eq_
import amo.tests
import context
import decorators
from hold import _locals, process, reset, add
class ESHold(amo.tests.TestCase):
def setUp(self):
reset()
self.callback = mock.Mock()
self.callback.__name__ = 'foo'
def test_add(self):
add(self.callback, 1)
eq_(len(_locals.tasks), 1)
add(self.callback, 1)
eq_(len(_locals.tasks), 1)
add(self.callback, 2)
eq_(len(_locals.tasks), 2)
callback = mock.Mock()
callback.__name__ = 'bar'
add(callback, 2)
eq_(len(_locals.tasks), 3)
def test_reset(self):
add(self.callback, 1)
eq_(len(_locals.tasks), 1)
reset()
eq_(len(_locals.tasks), 0)
def test_process(self):
add(self.callback, 1)
process()
assert self.callback.called
def test_process_groups(self):
add(self.callback, 1)
add(self.callback, 4)
process()
eq_(set(self.callback.call_args[0][0]), set([1, 4]))
def test_context(self):
with context.send():
add(self.callback, 1)
assert self.callback.called
def test_decorators(self):
@decorators.send
def foo():
add(self.callback, 1)
foo()
assert self.callback.called