diff --git a/.travis.yml b/.travis.yml index 61c3b01b40..75ba41a55e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,9 +4,8 @@ python: services: - memcached before_install: - - wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-0.90.13.deb && sudo dpkg -i elasticsearch-0.90.13.deb - - sudo /usr/share/elasticsearch/bin/plugin -install elasticsearch/elasticsearch-analysis-icu/1.13.0 - - sudo /usr/share/elasticsearch/bin/elasticsearch -Des.config=scripts/elasticsearch/elasticsearch.yml + - wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.3.2.deb && sudo dpkg -i elasticsearch-1.3.2.deb + - sudo /usr/share/elasticsearch/bin/elasticsearch -d -D es.path.data=/tmp -D es.gateway.type=none -D es.index.store.type=memory -D es.discovery.zen.ping.multicast.enabled=false install: - make update_deps - pip install --no-deps -r requirements/test.txt --find-links https://pyrepo.addons.mozilla.org/ diff --git a/apps/addons/cron.py b/apps/addons/cron.py index 3b02074024..9329fe871e 100644 --- a/apps/addons/cron.py +++ b/apps/addons/cron.py @@ -21,7 +21,6 @@ import waffle import amo from amo.decorators import write from amo.utils import chunked, walkfiles -from addons import search from addons.models import Addon, AppSupport, FrozenAddon, Persona from files.models import File from lib.es.utils import raise_if_reindex_in_progress @@ -474,10 +473,8 @@ def give_personas_versions(): @cronjobs.register -def reindex_addons(index=None, aliased=True, addon_type=None): +def reindex_addons(index=None, addon_type=None): from . import tasks - # Make sure our mapping is up to date. - search.setup_mapping(index, aliased) ids = (Addon.objects.values_list('id', flat=True) .filter(_current_version__isnull=False, status__in=amo.VALID_STATUSES, diff --git a/apps/addons/search.py b/apps/addons/search.py index 92610054a6..10ae7e504c 100644 --- a/apps/addons/search.py +++ b/apps/addons/search.py @@ -4,14 +4,17 @@ from operator import attrgetter from django.conf import settings from django.core.exceptions import ObjectDoesNotExist -import pyes.exceptions as pyes - import amo import amo.search +from amo.models import SearchMixin +from addons.cron import reindex_addons from addons.models import Persona -from amo.utils import create_es_index_if_missing +from bandwagon.cron import reindex_collections from bandwagon.models import Collection +from compat.cron import compatibility_report from compat.models import AppCompat +from lib.es.utils import create_index +from users.cron import reindex_users from users.models import UserProfile from versions.compare import version_int @@ -64,7 +67,7 @@ def extract(addon): # This would otherwise get attached when by the transformer. d['weekly_downloads'] = addon.persona.popularity # Boost on popularity. - d['_boost'] = addon.persona.popularity ** .2 + d['boost'] = addon.persona.popularity ** .2 d['has_theme_rereview'] = ( addon.persona.rereviewqueuetheme_set.exists()) except Persona.DoesNotExist: @@ -73,10 +76,10 @@ def extract(addon): else: # Boost by the number of users on a logarithmic scale. The maximum # boost (11,000,000 users for adblock) is about 5x. - d['_boost'] = addon.average_daily_users ** .2 + d['boost'] = addon.average_daily_users ** .2 # Double the boost if the add-on is public. if addon.status == amo.STATUS_PUBLIC and 'boost' in d: - d['_boost'] = max(d['_boost'], 1) * 4 + d['boost'] = max(d['boost'], 1) * 4 # Indices for each language. languages is a list of locales we want to # index with analyzer if the string's locale matches. @@ -98,28 +101,40 @@ def extract(addon): return d -def setup_mapping(index=None, aliased=True): - """Set up the addons index mapping.""" +def get_alias(): + return settings.ES_INDEXES.get(SearchMixin.ES_ALIAS_KEY) + + +def create_new_index(index=None, config=None): + if config is None: + config = {} + if index is None: + index = get_alias() + config['settings'] = {'index': INDEX_SETTINGS} + config['mappings'] = get_mappings() + create_index(index, config) + + +def get_mappings(): # Mapping describes how elasticsearch handles a document during indexing. # Most fields are detected and mapped automatically. - appver = {'dynamic': False, 'properties': {'max': {'type': 'long'}, - 'min': {'type': 'long'}}} - mapping = { - # Optional boosting during indexing. - '_boost': {'name': '_boost', 'null_value': 1.0}, + appver = { + 'dynamic': False, 'properties': { + 'max': {'type': 'long'}, + 'min': {'type': 'long'} + } + } + mapping = { + 'properties': { + 'boost': {'type': 'float', 'null_value': 1.0}, # Turn off analysis on name so we can sort by it. 'name_sort': {'type': 'string', 'index': 'not_analyzed'}, # Adding word-delimiter to split on camelcase and punctuation. - 'name': {'type': 'string', - 'analyzer': 'standardPlusWordDelimiter'}, - 'summary': {'type': 'string', - 'analyzer': 'snowball'}, - 'description': {'type': 'string', - 'analyzer': 'snowball'}, - 'tags': {'type': 'string', - 'index': 'not_analyzed', - 'index_name': 'tag'}, + 'name': {'type': 'string', 'analyzer': 'standardPlusWordDelimiter'}, + 'summary': {'type': 'string', 'analyzer': 'snowball'}, + 'description': {'type': 'string', 'analyzer': 'snowball'}, + 'tags': {'type': 'string', 'index': 'not_analyzed', 'index_name': 'tag'}, 'platforms': {'type': 'integer', 'index_name': 'platform'}, 'appversion': {'properties': dict((app.id, appver) for app in amo.APP_USAGE)}, @@ -127,10 +142,9 @@ def setup_mapping(index=None, aliased=True): } # Add room for language-specific indexes. for analyzer in amo.SEARCH_ANALYZER_MAP: - if (not settings.ES_USE_PLUGINS and - analyzer in amo.SEARCH_ANALYZER_PLUGINS): - log.info('While creating mapping, skipping the %s analyzer' - % analyzer) + if (not settings.ES_USE_PLUGINS + and analyzer in amo.SEARCH_ANALYZER_PLUGINS): + log.info('While creating mapping, skipping the %s analyzer' % analyzer) continue mapping['properties']['name_' + analyzer] = { @@ -146,14 +160,55 @@ def setup_mapping(index=None, aliased=True): 'analyzer': analyzer, } - es = amo.search.get_es() - # Adjust the mapping for all models at once because fields are shared - # across all doc types in an index. If we forget to adjust one of them - # we'll get burned later on. - for model in Addon, AppCompat, Collection, UserProfile: - index = index or model._get_index() - index = create_es_index_if_missing(index, aliased=aliased) + models = (Addon, AppCompat, Collection, UserProfile) + return dict((m._meta.db_table, mapping) for m in models) + + +def reindex(index): + indexers = [ + reindex_addons, reindex_collections, reindex_users, compatibility_report + ] + for indexer in indexers: + log.info('Indexing %r' % indexer.__name__) try: - es.put_mapping(model._meta.db_table, mapping, index) - except pyes.ElasticSearchException, e: - log.error(e) + indexer(index) + except Exception: + # We want to log this event but continue + log.error('Indexer %r failed' % indexer.__name__) + + +INDEX_SETTINGS = { + "analysis": { + "analyzer": { + "standardPlusWordDelimiter": { + "tokenizer": "standard", + "filter": ["standard", "wordDelim", "lowercase", "stop", "dict"] + } + }, + "filter": { + "wordDelim": { + "type": "word_delimiter", + "preserve_original": True + }, + "dict": { + "type": "dictionary_decompounder", + "word_list": [ + "cool", "iris", "fire", "bug", "flag", "fox", "grease", + "monkey", "flash", "block", "forecast", "screen", "grab", + "cookie", "auto", "fill", "text", "all", "so", "think", + "mega", "upload", "download", "video", "map", "spring", + "fix", "input", "clip", "fly", "lang", "up", "down", + "persona", "css", "html", "http", "ball", "firefox", + "bookmark", "chat", "zilla", "edit", "menu", "menus", + "status", "bar", "with", "easy", "sync", "search", "google", + "time", "window", "js", "super", "scroll", "title", "close", + "undo", "user", "inspect", "inspector", "browser", + "context", "dictionary", "mail", "button", "url", + "password", "secure", "image", "new", "tab", "delete", + "click", "name", "smart", "down", "manager", "open", + "query", "net", "link", "blog", "this", "color", "select", + "key", "keys", "foxy", "translate", "word", ] + } + } + } +} diff --git a/apps/addons/tests/test_cron.py b/apps/addons/tests/test_cron.py index f7493e620f..a748206534 100644 --- a/apps/addons/tests/test_cron.py +++ b/apps/addons/tests/test_cron.py @@ -1,12 +1,9 @@ import os import datetime -from nose.exc import SkipTest from nose.tools import eq_ import mock -from django.conf import settings - import amo import amo.tests from addons import cron diff --git a/apps/addons/tests/test_models.py b/apps/addons/tests/test_models.py index 30dcc203f5..2da4af91d1 100644 --- a/apps/addons/tests/test_models.py +++ b/apps/addons/tests/test_models.py @@ -26,7 +26,6 @@ from addons.models import (Addon, AddonCategory, AddonDependency, BlacklistedSlug, Category, Charity, CompatOverride, CompatOverrideRange, FrozenAddon, IncompatibleVersions, Persona, Preview) -from addons.search import setup_mapping from applications.models import Application, AppVersion from constants.applications import DEVICE_TYPES from devhub.models import ActivityLog, AddonLog, RssKey, SubmitStep @@ -2181,12 +2180,10 @@ class TestSearchSignals(amo.tests.ESTestCase): def setUp(self): super(TestSearchSignals, self).setUp() - setup_mapping() self.addCleanup(self.cleanup) def cleanup(self): - for index in settings.ES_INDEXES.values(): - self.es.delete_index_if_exists(index) + self.empty_index('default') def test_no_addons(self): eq_(Addon.search().count(), 0) diff --git a/apps/amo/models.py b/apps/amo/models.py index 10ab38a82a..ab8f60b12f 100644 --- a/apps/amo/models.py +++ b/apps/amo/models.py @@ -6,8 +6,8 @@ from django.db import models, transaction from django.utils import encoding, translation import caching.base +import elasticsearch import multidb.pinning -import pyes.exceptions import queryset_transform from . import search @@ -291,24 +291,26 @@ class OnChangeMixin(object): class SearchMixin(object): + ES_ALIAS_KEY = 'default' + @classmethod def _get_index(cls): indexes = settings.ES_INDEXES - return indexes.get(cls._meta.db_table) or indexes['default'] + return indexes.get(cls.ES_ALIAS_KEY) @classmethod - def index(cls, document, id=None, bulk=False, index=None): - """Wrapper around pyes.ES.index.""" + def index(cls, document, id=None, refresh=False, index=None): + """Wrapper around Elasticsearch.index.""" search.get_es().index( - document, index=index or cls._get_index(), - doc_type=cls._meta.db_table, id=id, bulk=bulk) + body=document, index=index or cls._get_index(), + doc_type=cls.get_mapping_type(), id=id, refresh=refresh) @classmethod def unindex(cls, id, index=None): es = search.get_es() try: es.delete(index or cls._get_index(), cls._meta.db_table, id) - except pyes.exceptions.NotFoundException: + except elasticsearch.TransportError: # Item wasn't found, whatevs. pass @@ -316,15 +318,8 @@ class SearchMixin(object): def search(cls, index=None): return search.ES(cls, index or cls._get_index()) - # For compatibility with elasticutils > v0.5. - # TODO: Remove these when we've moved mkt to its own index. - @classmethod - def get_index(cls): - return cls._get_index() - - @classmethod - def get_mapping_type_name(cls): + def get_mapping_type(cls): return cls._meta.db_table diff --git a/apps/amo/search.py b/apps/amo/search.py index cc14c4547b..46447eb5f0 100644 --- a/apps/amo/search.py +++ b/apps/amo/search.py @@ -3,10 +3,7 @@ import logging from django.conf import settings as dj_settings from django_statsd.clients import statsd -from elasticutils import S as EU_S -from elasticutils.contrib.django import get_es as eu_get_es -from pyes import ES as pyes_ES -from pyes import VERSION as PYES_VERSION +from elasticsearch import Elasticsearch log = logging.getLogger('z.es') @@ -18,60 +15,14 @@ DEFAULT_INDEXES = ['default'] DEFAULT_DUMP_CURL = None -# Pulled from elasticutils 0.5 so we can upgrade elasticutils to a newer -# version which is based on pyelasticsearch and not break AMO. -def get_es(hosts=None, default_indexes=None, timeout=None, dump_curl=None, - **settings): - """Create an ES object and return it. - - :arg hosts: list of uris; ES hosts to connect to, defaults to - ``['localhost:9200']`` - :arg default_indexes: list of strings; the default indexes to use, - defaults to 'default' - :arg timeout: int; the timeout in seconds, defaults to 5 - :arg dump_curl: function or None; function that dumps curl output, - see docs, defaults to None - :arg settings: other settings to pass into `pyes.es.ES` - - Examples: - - >>> es = get_es() - - >>> es = get_es(hosts=['localhost:9200']) - - >>> es = get_es(timeout=30) # good for indexing - - >>> es = get_es(default_indexes=['sumo_prod_20120627'] - - >>> class CurlDumper(object): - ... def write(self, text): - ... print text - ... - >>> es = get_es(dump_curl=CurlDumper()) - - """ +def get_es(hosts=None, timeout=None, **settings): + """Create an ES object and return it.""" # Cheap way of de-None-ifying things hosts = hosts or getattr(dj_settings, 'ES_HOSTS', DEFAULT_HOSTS) - default_indexes = default_indexes or [dj_settings.ES_INDEXES['default']] timeout = (timeout if timeout is not None else getattr(dj_settings, 'ES_TIMEOUT', DEFAULT_TIMEOUT)) - dump_curl = dump_curl or getattr(dj_settings, 'DEFAULT_DUMP_CURL', False) - if not isinstance(default_indexes, list): - default_indexes = [default_indexes] - - es = pyes_ES(hosts, default_indexes=default_indexes, timeout=timeout, - dump_curl=dump_curl, **settings) - - # pyes 0.15 does this lame thing where it ignores dump_curl in - # the ES constructor and always sets it to None. So what we do - # is set it manually after the ES has been created and - # defaults['dump_curl'] is truthy. This might not work for all - # values of dump_curl. - if PYES_VERSION[0:2] == (0, 15) and dump_curl is not None: - es.dump_curl = dump_curl - - return es + return Elasticsearch(hosts, timeout=timeout, **settings) class ES(object): @@ -112,6 +63,9 @@ class ES(object): def facet(self, **kw): return self._clone(next_step=('facet', kw.items())) + def source(self, *fields): + return self._clone(next_step=('source', fields)) + def extra(self, **kw): new = self._clone() actions = 'values values_dict order_by query filter facet'.split() @@ -147,6 +101,7 @@ class ES(object): queries = [] sort = [] fields = ['id'] + source = [] facets = {} as_list = as_dict = False for action, value in self.steps: @@ -169,35 +124,56 @@ class ES(object): queries.extend(self._process_queries(value)) elif action == 'filter': filters.extend(self._process_filters(value)) + elif action == 'source': + source.extend(value) elif action == 'facet': facets.update(value) else: raise NotImplementedError(action) - qs = {} - if len(filters) > 1: - qs['filter'] = {'and': filters} - elif filters: - qs['filter'] = filters[0] - if len(queries) > 1: - qs['query'] = {'bool': {'must': queries}} + qs = {'bool': {'must': queries}} elif queries: - qs['query'] = queries[0] + qs = queries[0] + else: + qs = {"match_all": {}} + + qs = { + "function_score": { + "query": qs, + "functions": [{"field_value_factor": {"field": "boost"}}] + } + } + + if filters: + if len(filters) > 1: + filters = {"and": filters} + qs = { + "filtered": { + "query": qs, + "filter": filters + } + } + + body = {"query": qs} + if sort: + body['sort'] = sort + if self.start: + body['from'] = self.start + if self.stop is not None: + body['size'] = self.stop - self.start + if facets: + body['facets'] = facets if fields: - qs['fields'] = fields - if facets: - qs['facets'] = facets - if sort: - qs['sort'] = sort - if self.start: - qs['from'] = self.start - if self.stop is not None: - qs['size'] = self.stop - self.start + body['fields'] = fields + # As per version 1.0, ES has deprecated loading fields not stored from + # '_source', plus non leaf fields are not allowed in fields. + if source: + body['_source'] = source self.fields, self.as_list, self.as_dict = fields, as_list, as_dict - return qs + return body def _split(self, string): if '__' in string: @@ -261,7 +237,11 @@ class ES(object): es = get_es() try: with statsd.timer('search.es.timer') as timer: - hits = es.search(qs, self.index, self.type._meta.db_table) + hits = es.search( + body=qs, + index=self.index, + doc_type=self.type._meta.db_table + ) except Exception: log.error(qs) raise @@ -317,7 +297,12 @@ class DictSearchResults(SearchResults): # Elasticsearch >= 1.0 style. for h in hits: hit = {} - for field, value in h['fields'].items(): + fields = h['fields'] + # If source is returned, it means that it has been asked, so + # take it. + if '_source' in h: + fields.update(h['_source']) + for field, value in fields.items(): if type(value) != list: value = [value] hit[field] = value @@ -353,43 +338,3 @@ class ObjectSearchResults(SearchResults): def __iter__(self): objs = dict((obj.id, obj) for obj in self.objects) return (objs[id] for id in self.ids if id in objs) - - -class TempS(EU_S): - # Temporary class override to mimic ElasticUtils v0.5 behavior. - # TODO: Remove this when we've moved mkt to its own index. - - def get_es(self, **kwargs): - """Returns the pyelasticsearch ElasticSearch object to use. - - This uses the django get_es builder by default which takes - into account settings in ``settings.py``. - - """ - return super(TempS, self).get_es(default_builder=eu_get_es) - - def _do_search(self): - """ - Perform the search, then convert that raw format into a SearchResults - instance and return it. - """ - if not self._results_cache: - hits = self.raw() - if self.as_list: - ResultClass = ListSearchResults - elif self.as_dict or self.type is None: - ResultClass = DictSearchResults - else: - ResultClass = ObjectSearchResults - self._results_cache = ResultClass(self.type, hits, self.fields) - return self._results_cache - - def _build_query(self): - query = super(TempS, self)._build_query() - if 'fields' in query: - if 'id' not in query['fields']: - query['fields'].append('id') - else: - query['fields'] = ['id'] - - return query diff --git a/apps/amo/tests/__init__.py b/apps/amo/tests/__init__.py index d45a0c26cf..31b3d3077b 100644 --- a/apps/amo/tests/__init__.py +++ b/apps/amo/tests/__init__.py @@ -19,10 +19,7 @@ from django.test.client import Client from django.utils import translation import caching -import elasticutils.contrib.django as elasticutils import mock -import pyelasticsearch.exceptions as pyelasticsearch -import pyes.exceptions as pyes import test_utils import tower from dateutil.parser import parse as dateutil_parser @@ -201,21 +198,7 @@ class TestClient(Client): raise AttributeError -ES_patchers = [mock.patch('amo.search.get_es', spec=True), - mock.patch('elasticutils.contrib.django', spec=True)] - - -def start_es_mock(): - for patch in ES_patchers: - patch.start() - - -def stop_es_mock(): - for patch in ES_patchers: - patch.stop() - - if hasattr(elasticutils, '_local') and hasattr(elasticutils._local, 'es'): - delattr(elasticutils._local, 'es') +Mocked_ES = mock.patch('amo.search.get_es', spec=True) def mock_es(f): @@ -225,11 +208,11 @@ def mock_es(f): """ @wraps(f) def decorated(request, *args, **kwargs): - start_es_mock() + Mocked_ES.start() try: return f(request, *args, **kwargs) finally: - stop_es_mock() + Mocked_ES.stop() return decorated @@ -243,7 +226,7 @@ class MockEsMixin(object): @classmethod def setUpClass(cls): if cls.mock_es: - start_es_mock() + Mocked_ES.start() try: reset.send(None) # Reset all the ES tasks on hold. super(MockEsMixin, cls).setUpClass() @@ -251,7 +234,7 @@ class MockEsMixin(object): # We need to unpatch here because tearDownClass will not be # called. if cls.mock_es: - stop_es_mock() + Mocked_ES.stop() raise @classmethod @@ -260,7 +243,7 @@ class MockEsMixin(object): super(MockEsMixin, cls).tearDownClass() finally: if cls.mock_es: - stop_es_mock() + Mocked_ES.stop() class TestCase(MockEsMixin, RedisTest, test_utils.TestCase): @@ -736,7 +719,7 @@ class ESTestCase(TestCase): super(ESTestCase, cls).setUpClass() try: - cls.es.cluster_health() + cls.es.cluster.health() except Exception, e: e.args = tuple([u'%s (it looks like ES is not running, ' 'try starting it or set RUN_ES_TESTS=False)' @@ -750,27 +733,10 @@ class ESTestCase(TestCase): } for index in set(settings.ES_INDEXES.values()): - # Get the index that's pointed to by the alias. - try: - indices = cls.es.get_alias(index) - index = indices[0] - except IndexError: - # There's no alias, just use the index. - print 'Found no alias for %s.' % index - index = index - except (pyes.IndexMissingException, - pyelasticsearch.ElasticHttpNotFoundError): - pass + cls.es.indices.delete(index, ignore=[404]) - # Remove any alias as well. - try: - cls.es.delete_index(index) - except (pyes.IndexMissingException, - pyelasticsearch.ElasticHttpNotFoundError) as exc: - print 'Could not delete index %r: %s' % (index, exc) - - addons.search.setup_mapping() - stats.search.setup_indexes() + addons.search.create_new_index() + stats.search.create_new_index() @classmethod def tearDownClass(cls): @@ -799,7 +765,7 @@ class ESTestCase(TestCase): @classmethod def refresh(cls, index='default', timesleep=0): process.send(None) - cls.es.refresh(settings.ES_INDEXES[index], timesleep=timesleep) + cls.es.indices.refresh(settings.ES_INDEXES[index]) @classmethod def reindex(cls, model, index='default'): @@ -817,3 +783,10 @@ class ESTestCase(TestCase): addon_factory(), addon_factory(), ] + + @classmethod + def empty_index(cls, index): + cls.es.delete_by_query( + settings.ES_INDEXES[index], + body={"query": {"match_all": {}}} + ) diff --git a/apps/amo/tests/test_search.py b/apps/amo/tests/test_search.py index 3b84bd0ab6..7dd7ebb977 100644 --- a/apps/amo/tests/test_search.py +++ b/apps/amo/tests/test_search.py @@ -60,121 +60,100 @@ class TestES(amo.tests.ESTestCase): # Doing a filter creates a new ES object. qs = Addon.search() qs2 = qs.filter(type=1) - eq_(qs._build_query(), {'fields': ['id']}) - eq_(qs2._build_query(), {'fields': ['id'], - 'filter': {'term': {'type': 1}}}) + assert 'filtered' not in qs._build_query()['query'] + assert 'filtered' in qs2._build_query()['query'] def test_filter(self): qs = Addon.search().filter(type=1) - eq_(qs._build_query(), {'fields': ['id'], - 'filter': {'term': {'type': 1}}}) + eq_(qs._build_query()['query']['filtered']['filter'], + [{'term': {'type': 1}}]) def test_in_filter(self): qs = Addon.search().filter(type__in=[1, 2]) - eq_(qs._build_query(), {'fields': ['id'], - 'filter': {'in': {'type': [1, 2]}}}) + eq_(qs._build_query()['query']['filtered']['filter'], + [{'in': {'type': [1, 2]}}]) def test_and(self): qs = Addon.search().filter(type=1, category__in=[1, 2]) - eq_(qs._build_query(), {'fields': ['id'], - 'filter': {'and': [ - {'term': {'type': 1}}, - {'in': {'category': [1, 2]}}, - ]}}) + eq_(qs._build_query()['query']['filtered']['filter'], + {'and': [{'term': {'type': 1}}, {'in': {'category': [1, 2]}}]}) def test_query(self): qs = Addon.search().query(type=1) - eq_(qs._build_query(), {'fields': ['id'], - 'query': {'term': {'type': 1}}}) + eq_(qs._build_query()['query']['function_score']['query'], + {'term': {'type': 1}}) def test_query_match(self): qs = Addon.search().query(name__match='woo woo') - eq_(qs._build_query(), {'fields': ['id'], - 'query': {'match': {'name': 'woo woo'}}}) + eq_(qs._build_query()['query']['function_score']['query'], + {'match': {'name': 'woo woo'}}) def test_query_multiple_and_range(self): qs = Addon.search().query(type=1, status__gte=1) - eq_(qs._build_query(), {'fields': ['id'], - 'query': {'bool': {'must': [ - {'term': {'type': 1}}, - {'range': {'status': {'gte': 1}}}, - ]}}}) + eq_(qs._build_query()['query']['function_score']['query'], + {'bool': {'must': [{'term': {'type': 1}}, + {'range': {'status': {'gte': 1}}}, ]}}) def test_query_or(self): qs = Addon.search().query(or_=dict(type=1, status__gte=2)) - eq_(qs._build_query(), {'fields': ['id'], - 'query': {'bool': {'should': [ - {'term': {'type': 1}}, - {'range': {'status': {'gte': 2}}}, - ]}}}) + eq_(qs._build_query()['query']['function_score']['query'], + {'bool': {'should': [{'term': {'type': 1}}, + {'range': {'status': {'gte': 2}}}, ]}}) def test_query_or_and(self): qs = Addon.search().query(or_=dict(type=1, status__gte=2), category=2) - eq_(qs._build_query(), {'fields': ['id'], - 'query': {'bool': {'must': [ - {'term': {'category': 2}}, - {'bool': {'should': [ - {'term': {'type': 1}}, - {'range': {'status': {'gte': 2}}}, - ]}} - ]}}}) + eq_(qs._build_query()['query']['function_score']['query'], + {'bool': {'must': [{'term': {'category': 2}}, + {'bool': {'should': [ + {'term': {'type': 1}}, + {'range': {'status': {'gte': 2}}}, ]}}]}}) def test_query_fuzzy(self): fuzz = {'boost': 2, 'value': 'woo'} qs = Addon.search().query(or_=dict(type=1, status__fuzzy=fuzz)) - eq_(qs._build_query(), {'fields': ['id'], - 'query': {'bool': {'should': [ - {'fuzzy': {'status': fuzz}}, - {'term': {'type': 1}}, - ]}}}) + eq_(qs._build_query()['query']['function_score']['query'], + {'bool': {'should': [{'fuzzy': {'status': fuzz}}, + {'term': {'type': 1}}, ]}}) def test_order_by_desc(self): qs = Addon.search().order_by('-rating') - eq_(qs._build_query(), {'fields': ['id'], - 'sort': [{'rating': 'desc'}]}) + eq_(qs._build_query()['sort'], [{'rating': 'desc'}]) def test_order_by_asc(self): qs = Addon.search().order_by('rating') - eq_(qs._build_query(), {'fields': ['id'], - 'sort': ['rating']}) + eq_(qs._build_query()['sort'], ['rating']) def test_order_by_multiple(self): qs = Addon.search().order_by('-rating', 'id') - eq_(qs._build_query(), {'fields': ['id'], - 'sort': [{'rating': 'desc'}, 'id']}) + eq_(qs._build_query()['sort'], [{'rating': 'desc'}, 'id']) def test_slice(self): qs = Addon.search()[5:12] - eq_(qs._build_query(), {'fields': ['id'], - 'from': 5, - 'size': 7}) + eq_(qs._build_query()['from'], 5) + eq_(qs._build_query()['size'], 7) def test_filter_or(self): qs = Addon.search().filter(type=1).filter(or_=dict(status=1, app=2)) - eq_(qs._build_query(), {'fields': ['id'], - 'filter': {'and': [ - {'term': {'type': 1}}, - {'or': [{'term': {'status': 1}}, - {'term': {'app': 2}}]}, - ]}}) + eq_(qs._build_query()['query']['filtered']['filter'], + {'and': [ + {'term': {'type': 1}}, + {'or': [{'term': {'status': 1}}, {'term': {'app': 2}}]}, + ]}) qs = Addon.search().filter(type=1, or_=dict(status=1, app=2)) - eq_(qs._build_query(), {'fields': ['id'], - 'filter': {'and': [ - {'term': {'type': 1}}, - {'or': [{'term': {'status': 1}}, - {'term': {'app': 2}}]}, - ]}}) + eq_(qs._build_query()['query']['filtered']['filter'], + {'and': [ + {'term': {'type': 1}}, + {'or': [{'term': {'status': 1}}, {'term': {'app': 2}}]}, + ]}) def test_slice_stop(self): qs = Addon.search()[:6] - eq_(qs._build_query(), {'fields': ['id'], - 'size': 6}) + eq_(qs._build_query()['size'], 6) def test_slice_stop_zero(self): qs = Addon.search()[:0] - eq_(qs._build_query(), {'fields': ['id'], - 'size': 0}) + eq_(qs._build_query()['size'], 0) def test_getitem(self): addons = list(Addon.search()) @@ -200,55 +179,54 @@ class TestES(amo.tests.ESTestCase): def test_gte(self): qs = Addon.search().filter(type__in=[1, 2], status__gte=4) - eq_(qs._build_query(), {'fields': ['id'], - 'filter': {'and': [ - {'in': {'type': [1, 2]}}, - {'range': {'status': {'gte': 4}}}, - ]}}) + eq_(qs._build_query()['query']['filtered']['filter'], + {'and': [ + {'in': {'type': [1, 2]}}, + {'range': {'status': {'gte': 4}}}, + ]}) def test_lte(self): qs = Addon.search().filter(type__in=[1, 2], status__lte=4) - eq_(qs._build_query(), {'fields': ['id'], - 'filter': {'and': [ - {'in': {'type': [1, 2]}}, - {'range': {'status': {'lte': 4}}}, - ]}}) + eq_(qs._build_query()['query']['filtered']['filter'], + {'and': [ + {'in': {'type': [1, 2]}}, + {'range': {'status': {'lte': 4}}}, + ]}) def test_gt(self): qs = Addon.search().filter(type__in=[1, 2], status__gt=4) - eq_(qs._build_query(), {'fields': ['id'], - 'filter': {'and': [ - {'in': {'type': [1, 2]}}, - {'range': {'status': {'gt': 4}}}, - ]}}) + eq_(qs._build_query()['query']['filtered']['filter'], + {'and': [ + {'in': {'type': [1, 2]}}, + {'range': {'status': {'gt': 4}}}, + ]}) def test_lt(self): qs = Addon.search().filter(type__in=[1, 2], status__lt=4) - eq_(qs._build_query(), {'fields': ['id'], - 'filter': {'and': [ - {'range': {'status': {'lt': 4}}}, - {'in': {'type': [1, 2]}}, - ]}}) + eq_(qs._build_query()['query']['filtered']['filter'], + {'and': [ + {'range': {'status': {'lt': 4}}}, + {'in': {'type': [1, 2]}}, + ]}) def test_lt2(self): qs = Addon.search().filter(status__lt=4) - eq_(qs._build_query(), {'fields': ['id'], - 'filter': {'range': {'status': {'lt': 4}}}}) + eq_(qs._build_query()['query']['filtered']['filter'], + [{'range': {'status': {'lt': 4}}}]) def test_range(self): qs = Addon.search().filter(date__range=('a', 'b')) - eq_(qs._build_query(), {'fields': ['id'], - 'filter': {'range': {'date': {'gte': 'a', - 'lte': 'b'}}}}) + eq_(qs._build_query()['query']['filtered']['filter'], + [{'range': {'date': {'gte': 'a', 'lte': 'b'}}}]) def test_prefix(self): qs = Addon.search().query(name__startswith='woo') - eq_(qs._build_query(), {'fields': ['id'], - 'query': {'prefix': {'name': 'woo'}}}) + eq_(qs._build_query()['query']['function_score']['query'], + {'prefix': {'name': 'woo'}}) def test_values(self): qs = Addon.search().values('name') - eq_(qs._build_query(), {'fields': ['id', 'name']}) + eq_(qs._build_query()['fields'], ['id', 'name']) def test_values_result(self): addons = [([a.id], [a.slug]) for a in self._addons] @@ -257,11 +235,11 @@ class TestES(amo.tests.ESTestCase): def test_values_dict(self): qs = Addon.search().values_dict('name') - eq_(qs._build_query(), {'fields': ['id', 'name']}) + eq_(qs._build_query()['fields'], ['id', 'name']) def test_empty_values_dict(self): qs = Addon.search().values_dict() - eq_(qs._build_query(), {}) + assert 'fields' not in qs._build_query() def test_values_dict_result(self): addons = [{'id': [a.id], 'slug': [a.slug]} for a in self._addons] @@ -289,74 +267,67 @@ class TestES(amo.tests.ESTestCase): def test_extra_values(self): qs = Addon.search().extra(values=['name']) - eq_(qs._build_query(), {'fields': ['id', 'name']}) + eq_(qs._build_query()['fields'], ['id', 'name']) qs = Addon.search().values('status').extra(values=['name']) - eq_(qs._build_query(), {'fields': ['id', 'status', 'name']}) + eq_(qs._build_query()['fields'], ['id', 'status', 'name']) def test_extra_values_dict(self): qs = Addon.search().extra(values_dict=['name']) - eq_(qs._build_query(), {'fields': ['id', 'name']}) + eq_(qs._build_query()['fields'], ['id', 'name']) qs = Addon.search().values_dict('status').extra(values_dict=['name']) - eq_(qs._build_query(), {'fields': ['id', 'status', 'name']}) + eq_(qs._build_query()['fields'], ['id', 'status', 'name']) def test_extra_order_by(self): qs = Addon.search().extra(order_by=['-rating']) - eq_(qs._build_query(), {'fields': ['id'], - 'sort': [{'rating': 'desc'}]}) + eq_(qs._build_query()['sort'], [{'rating': 'desc'}]) qs = Addon.search().order_by('-id').extra(order_by=['-rating']) - eq_(qs._build_query(), {'fields': ['id'], - 'sort': [{'id': 'desc'}, - {'rating': 'desc'}]}) + eq_(qs._build_query()['sort'], [{'id': 'desc'}, {'rating': 'desc'}]) def test_extra_query(self): qs = Addon.search().extra(query={'type': 1}) - eq_(qs._build_query(), {'fields': ['id'], - 'query': {'term': {'type': 1}}}) + eq_(qs._build_query()['query']['function_score']['query'], + {'term': {'type': 1}}) qs = Addon.search().filter(status=1).extra(query={'type': 1}) - eq_(qs._build_query(), {'fields': ['id'], - 'query': {'term': {'type': 1}}, - 'filter': {'term': {'status': 1}}}) + filtered = qs._build_query()['query']['filtered'] + eq_(filtered['query']['function_score']['query'], {'term': {'type': 1}}) + eq_(filtered['filter'], [{'term': {'status': 1}}]) def test_extra_filter(self): qs = Addon.search().extra(filter={'category__in': [1, 2]}) - eq_(qs._build_query(), {'fields': ['id'], - 'filter': {'in': {'category': [1, 2]}}}) + eq_(qs._build_query()['query']['filtered']['filter'], + [{'in': {'category': [1, 2]}}]) qs = (Addon.search().filter(type=1) .extra(filter={'category__in': [1, 2]})) - eq_(qs._build_query(), {'fields': ['id'], - 'filter': {'and': [ - {'term': {'type': 1}}, - {'in': {'category': [1, 2]}}, - ]}}) + eq_(qs._build_query()['query']['filtered']['filter'], + {'and': [{'term': {'type': 1}}, {'in': {'category': [1, 2]}}, ]}) def test_extra_filter_or(self): qs = Addon.search().extra(filter={'or_': {'status': 1, 'app': 2}}) - eq_(qs._build_query(), {'fields': ['id'], - 'filter': {'or': [ - {'term': {'status': 1}}, - {'term': {'app': 2}}]}}) + eq_(qs._build_query()['query']['filtered']['filter'], + [{'or': [{'term': {'status': 1}}, {'term': {'app': 2}}]}]) qs = (Addon.search().filter(type=1) .extra(filter={'or_': {'status': 1, 'app': 2}})) - eq_(qs._build_query(), {'fields': ['id'], - 'filter': {'and': [ - {'term': {'type': 1}}, - {'or': [{'term': {'status': 1}}, - {'term': {'app': 2}}]}, - ]}}) + eq_(qs._build_query()['query']['filtered']['filter'], + {'and': [{'term': {'type': 1}}, + {'or': [{'term': {'status': 1}}, {'term': {'app': 2}}]}]}) def test_facet_range(self): facet = {'range': {'status': [{'lte': 3}, {'gte': 5}]}} # Pass a copy so edits aren't propagated back here. qs = Addon.search().filter(app=1).facet(by_status=dict(facet)) - eq_(qs._build_query(), {'fields': ['id'], - 'filter': {'term': {'app': 1}}, - 'facets': {'by_status': facet}}) + eq_(qs._build_query()['query']['filtered']['filter'], + [{'term': {'app': 1}}]) + eq_(qs._build_query()['facets'], {'by_status': facet}) + + def test_source(self): + qs = Addon.search().source('versions') + eq_(qs._build_query()['_source'], ['versions']) class TestPaginator(amo.tests.ESTestCase): diff --git a/apps/amo/utils.py b/apps/amo/utils.py index 6549e72240..8700e0ccfc 100644 --- a/apps/amo/utils.py +++ b/apps/amo/utils.py @@ -35,10 +35,8 @@ from django.utils.functional import Promise from django.utils.http import urlquote import bleach -import elasticutils.contrib.django as elasticutils import html5lib import jinja2 -import pyes.exceptions as pyes import pytz from babel import Locale from cef import log_cef as _log_cef @@ -123,7 +121,7 @@ def paginate(request, queryset, per_page=20, count=None): ``.count()`` on the queryset. This can be good if the queryset would produce an expensive count query. """ - p = (ESPaginator if isinstance(queryset, (amo.search.ES, elasticutils.S)) + p = (ESPaginator if isinstance(queryset, amo.search.ES) else paginator.Paginator)(queryset, per_page) if count is not None: @@ -919,59 +917,6 @@ def rm_local_tmp_file(path): return os.unlink(path) -def timestamp_index(index): - """Returns index-YYYYMMDDHHMMSS with the current time.""" - return '%s-%s' % (index, datetime.datetime.now().strftime('%Y%m%d%H%M%S')) - - -def create_es_index_if_missing(index, config=None, aliased=False): - """Creates an index if it's not present. - - Returns the index name. It may change if it was aliased. - - Options: - - - index: name of the index. - - config: if provided, used as the settings option for the - ES calls. - - aliased: If set to true, the index is suffixed with a timestamp - and an alias with the index name is created. - """ - es = amo.search.get_es() - - if aliased: - alias = index - try: - indices = es.get_alias(alias) - if len(indices) > 1: - raise ValueError("The %r alias should not point to " - "several indices" % index) - # we're good here - the alias and the index exist - return indices[0] - except pyes.IndexMissingException: - # no alias exists, so we want to - # create a fresh one and a fresh index - index = timestamp_index(index) - - if settings.IN_TEST_SUITE: - if not config: - config = {} - # Be nice to ES running on ci.mozilla.org - config.update({'number_of_shards': 3, - 'number_of_replicas': 0}) - try: - es.create_index_if_missing(index, settings=config) - if aliased: - try: - es.add_alias(alias, [index]) - except pyes.ElasticSearchException, exc: - log.info('ES error creating alias: %s' % exc) - except pyes.ElasticSearchException, exc: - log.info('ES error creating index: %s' % exc) - - return index - - def timer(*func, **kwargs): """ Outputs statsd timings for the decorated method, ignored if not diff --git a/apps/bandwagon/cron.py b/apps/bandwagon/cron.py index 4dbe34b123..2291c84cb8 100644 --- a/apps/bandwagon/cron.py +++ b/apps/bandwagon/cron.py @@ -149,7 +149,7 @@ def _drop_collection_recs(**kw): @cronjobs.register -def reindex_collections(index=None, aliased=True): +def reindex_collections(index=None): from . import tasks ids = (Collection.objects.exclude(type=amo.COLLECTION_SYNCHRONIZED) .values_list('id', flat=True)) diff --git a/apps/bandwagon/search.py b/apps/bandwagon/search.py index f6db89800b..e51d0a4ee3 100644 --- a/apps/bandwagon/search.py +++ b/apps/bandwagon/search.py @@ -19,11 +19,11 @@ def extract(collection): in translations[collection.description_id])) # Boost by the number of subscribers. - d['_boost'] = collection.subscribers ** .2 + d['boost'] = collection.subscribers ** .2 # Double the boost if the collection is public. if collection.listed: - d['_boost'] = max(d['_boost'], 1) * 4 + d['boost'] = max(d['boost'], 1) * 4 # Indices for each language. languages is a list of locales we want to # index with analyzer if the string's locale matches. diff --git a/apps/bandwagon/tasks.py b/apps/bandwagon/tasks.py index 38957e915e..11131e32dc 100644 --- a/apps/bandwagon/tasks.py +++ b/apps/bandwagon/tasks.py @@ -4,7 +4,6 @@ import math from django.core.files.storage import default_storage as storage from django.db.models import Count -import elasticutils.contrib.django as elasticutils from celeryutils import task import amo diff --git a/apps/compat/cron.py b/apps/compat/cron.py index f9d09bdf03..2cd4afd573 100644 --- a/apps/compat/cron.py +++ b/apps/compat/cron.py @@ -20,7 +20,7 @@ log = logging.getLogger('z.compat') @cronjobs.register -def compatibility_report(index=None, aliased=True): +def compatibility_report(index=None): docs = defaultdict(dict) indices = get_indices(index) @@ -123,5 +123,6 @@ def compatibility_report(index=None, aliased=True): for chunk in amo.utils.chunked(docs.values(), 150): for doc in chunk: for index in indices: - AppCompat.index(doc, id=doc['id'], bulk=True, index=index) - amo.search.get_es().flush_bulk(forced=True) + AppCompat.index(doc, id=doc['id'], refresh=False, index=index) + es = amo.search.get_es() + es.indices.refresh() diff --git a/apps/constants/search.py b/apps/constants/search.py index 0f427155b9..1730bd550f 100644 --- a/apps/constants/search.py +++ b/apps/constants/search.py @@ -65,7 +65,6 @@ SEARCH_ANALYZER_PLUGINS = [ 'polish', ] - # Which stemmer to use for each langauge. # # Note: We use the keys of this dict for supported stop words, also, which is diff --git a/apps/editors/tests/test_views_themes.py b/apps/editors/tests/test_views_themes.py index 50bea16b24..088f063551 100644 --- a/apps/editors/tests/test_views_themes.py +++ b/apps/editors/tests/test_views_themes.py @@ -600,6 +600,7 @@ class TestThemeQueueRereview(ThemeReviewTestMixin, amo.tests.TestCase): assert (copy_mock2.call_args_list[0][0][1] .endswith('preview_large.jpg')) + class TestDeletedThemeLookup(amo.tests.TestCase): fixtures = ['base/users', 'editors/user_persona_reviewer', 'editors/user_senior_persona_reviewer'] @@ -627,6 +628,7 @@ class TestThemeSearch(amo.tests.ESTestCase): def setUp(self): self.addon = addon_factory(type=amo.ADDON_PERSONA, name='themeteam', status=amo.STATUS_PENDING) + self.refresh('default') def search(self, q, flagged=False, rereview=False): get_query = {'q': q, 'queue_type': ('rereview' if rereview else @@ -643,11 +645,13 @@ class TestThemeSearch(amo.tests.ESTestCase): def test_flagged(self): self.addon.update(status=amo.STATUS_REVIEW_PENDING) + self.refresh('default') eq_(self.search('theme', flagged=True)[0]['id'], self.addon.id) def test_rereview(self): RereviewQueueTheme.objects.create(theme=self.addon.persona) self.addon.save() + self.refresh('default') eq_(self.search('theme', rereview=True)[0]['id'], self.addon.id) diff --git a/apps/editors/views_themes.py b/apps/editors/views_themes.py index 1a89c21dd8..c6f100a762 100644 --- a/apps/editors/views_themes.py +++ b/apps/editors/views_themes.py @@ -15,7 +15,7 @@ import constants.editors as rvw from access import acl from addons.models import Addon, Persona from amo.decorators import json_view, post_required -from amo.search import TempS +from amo.search import ES from amo.urlresolvers import reverse from amo.utils import days_ago, paginate from devhub.models import ActivityLog @@ -199,7 +199,7 @@ def themes_search(request): flagged = search_form.cleaned_data['queue_type'] == 'flagged' # ES query on name. - themes = TempS(Addon).filter(type=amo.ADDON_PERSONA) + themes = Addon.search().filter(type=amo.ADDON_PERSONA) if rereview: themes = themes.filter(has_theme_rereview=True) else: diff --git a/apps/search/middleware.py b/apps/search/middleware.py index 16dca80082..788670a5e1 100644 --- a/apps/search/middleware.py +++ b/apps/search/middleware.py @@ -2,8 +2,7 @@ import logging from django.shortcuts import render -from pyes.exceptions import ElasticSearchException -from pyes.urllib3.connectionpool import HTTPError +from elasticsearch import TransportError log = logging.getLogger('z.es') @@ -12,7 +11,6 @@ log = logging.getLogger('z.es') class ElasticsearchExceptionMiddleware(object): def process_exception(self, request, exception): - if (issubclass(exception.__class__, (ElasticSearchException, - HTTPError))): + if issubclass(exception.__class__, TransportError): log.error(u'Elasticsearch error: %s' % exception) return render(request, 'search/down.html', status=503) diff --git a/apps/search/tests/test_middleware.py b/apps/search/tests/test_middleware.py index 73c0b72eaf..500358ba3e 100644 --- a/apps/search/tests/test_middleware.py +++ b/apps/search/tests/test_middleware.py @@ -1,8 +1,7 @@ import mock from nose.tools import eq_ -from pyes.exceptions import ElasticSearchException, IndexMissingException -from pyes.urllib3.connectionpool import HTTPError, MaxRetryError, TimeoutError from test_utils import RequestFactory +from elasticsearch import TransportError import amo.tests from search.middleware import ElasticsearchExceptionMiddleware as ESM @@ -15,19 +14,10 @@ class TestElasticsearchExceptionMiddleware(amo.tests.TestCase): @mock.patch('search.middleware.render') def test_exceptions_we_catch(self, render_mock): - # These are instantiated with an error string. - for e in [ElasticSearchException, IndexMissingException]: - ESM().process_exception(self.request, e('ES ERROR')) - render_mock.assert_called_with(self.request, 'search/down.html', - status=503) - render_mock.reset_mock() - - # These are just Exception classes. - for e in [HTTPError, MaxRetryError, TimeoutError]: - ESM().process_exception(self.request, e('ES ERROR')) - render_mock.assert_called_with(self.request, 'search/down.html', - status=503) - render_mock.reset_mock() + ESM().process_exception(self.request, TransportError(400, 'ES ERROR')) + render_mock.assert_called_with(self.request, 'search/down.html', + status=503) + render_mock.reset_mock() @mock.patch('search.middleware.render') def test_exceptions_we_do_not_catch(self, render_mock): diff --git a/apps/search/tests/test_views.py b/apps/search/tests/test_views.py index 58fe97d211..1037203b22 100644 --- a/apps/search/tests/test_views.py +++ b/apps/search/tests/test_views.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- import json -import mock import urlparse from django.conf import settings diff --git a/apps/stats/cron.py b/apps/stats/cron.py index bdc7e36016..e35b1101ad 100644 --- a/apps/stats/cron.py +++ b/apps/stats/cron.py @@ -85,14 +85,14 @@ def addon_total_contributions(): @cronjobs.register -def index_latest_stats(index=None, aliased=True): +def index_latest_stats(index=None): raise_if_reindex_in_progress('amo') + fmt = lambda d: d.strftime('%Y-%m-%d') latest = UpdateCount.search(index).order_by('-date').values_dict() if latest: latest = latest[0]['date'] else: - latest = datetime.date.today() - datetime.timedelta(days=1) - fmt = lambda d: d.strftime('%Y-%m-%d') - date_range = '%s:%s' % (fmt(latest), fmt(datetime.date.today())) + latest = fmt(datetime.date.today() - datetime.timedelta(days=1)) + date_range = '%s:%s' % (latest, fmt(datetime.date.today())) cron_log.info('index_stats --date=%s' % date_range) call_command('index_stats', addons=None, date=date_range) diff --git a/apps/stats/models.py b/apps/stats/models.py index 490f57ba6c..270de83f22 100644 --- a/apps/stats/models.py +++ b/apps/stats/models.py @@ -37,7 +37,12 @@ class AddonCollectionCount(models.Model): db_table = 'stats_addons_collections_counts' -class CollectionCount(SearchMixin, models.Model): +class StatsSearchMixin(SearchMixin): + + ES_ALIAS_KEY = 'stats' + + +class CollectionCount(StatsSearchMixin, models.Model): collection = models.ForeignKey('bandwagon.Collection') count = models.PositiveIntegerField() date = models.DateField() @@ -57,7 +62,7 @@ class CollectionStats(models.Model): db_table = 'stats_collections' -class DownloadCount(SearchMixin, models.Model): +class DownloadCount(StatsSearchMixin, models.Model): addon = models.ForeignKey('addons.Addon') count = models.PositiveIntegerField() date = models.DateField() @@ -68,7 +73,7 @@ class DownloadCount(SearchMixin, models.Model): # TODO: remove when the script is proven to work correctly. -class DownloadCountTmp(SearchMixin, models.Model): +class DownloadCountTmp(StatsSearchMixin, models.Model): addon = models.ForeignKey('addons.Addon') count = models.PositiveIntegerField() date = models.DateField() @@ -78,7 +83,7 @@ class DownloadCountTmp(SearchMixin, models.Model): db_table = 'download_counts_tmp' -class UpdateCount(SearchMixin, models.Model): +class UpdateCount(StatsSearchMixin, models.Model): addon = models.ForeignKey('addons.Addon') count = models.PositiveIntegerField() date = models.DateField() @@ -93,7 +98,7 @@ class UpdateCount(SearchMixin, models.Model): # TODO: remove when the script is proven to work correctly. -class UpdateCountTmp(SearchMixin, models.Model): +class UpdateCountTmp(StatsSearchMixin, models.Model): addon = models.ForeignKey('addons.Addon') count = models.PositiveIntegerField() date = models.DateField() @@ -123,7 +128,7 @@ class ThemeUpdateCountManager(models.Manager): return dict((d['addon_id'], d['avg']) for d in averages) -class ThemeUpdateCount(SearchMixin, models.Model): +class ThemeUpdateCount(StatsSearchMixin, models.Model): """Daily users taken from the ADI data (coming from Hive).""" addon = models.ForeignKey('addons.Addon') count = models.PositiveIntegerField() @@ -384,7 +389,7 @@ class ClientData(models.Model): 'is_chromeless', 'language', 'region') -class ThemeUserCount(SearchMixin, models.Model): +class ThemeUserCount(StatsSearchMixin, models.Model): """Theme popularity (weekly average of users). This is filled in by a cron job reading the popularity from the theme diff --git a/apps/stats/search.py b/apps/stats/search.py index e93f53e52a..a477d3f55d 100644 --- a/apps/stats/search.py +++ b/apps/stats/search.py @@ -1,10 +1,14 @@ import collections +from django.conf import settings +from django.core.management import call_command + import amo import amo.search -from amo.utils import create_es_index_if_missing from applications.models import AppVersion -from stats.models import CollectionCount, DownloadCount, UpdateCount +from lib.es.utils import create_index +from stats.models import (CollectionCount, DownloadCount, StatsSearchMixin, + UpdateCount) def es_dict(items): @@ -123,24 +127,42 @@ def get_all_app_versions(): return dict(rv) -def setup_indexes(index=None, aliased=True): - es = amo.search.get_es() - for model in CollectionCount, DownloadCount, UpdateCount: - index = index or model._get_index() - index = create_es_index_if_missing(index, aliased=aliased) +def get_alias(): + return settings.ES_INDEXES.get(StatsSearchMixin.ES_ALIAS_KEY) - mapping = { - 'properties': { - 'id': {'type': 'long'}, - 'count': {'type': 'long'}, - 'data': {'dynamic': 'true', - 'properties': { - 'v': {'type': 'long'}, - 'k': {'type': 'string'} - } - }, - 'date': {'format': 'dateOptionalTime', - 'type': 'date'} + +def create_new_index(index=None, config=None): + if config is None: + config = {} + if index is None: + index = get_alias() + config['mappings'] = get_mappings() + create_index(index, config) + + +def reindex(index): + call_command('index_stats') + + +def get_mappings(): + mapping = { + 'properties': { + 'id': {'type': 'long'}, + 'boost': {'type': 'float', 'null_value': 1.0}, + 'count': {'type': 'long'}, + 'data': { + 'dynamic': 'true', + 'properties': { + 'v': {'type': 'long'}, + 'k': {'type': 'string'} + } + }, + 'date': { + 'format': 'dateOptionalTime', + 'type': 'date' } } - es.put_mapping(model._meta.db_table, mapping, index) + } + + models = (CollectionCount, DownloadCount, UpdateCount) + return dict((m._meta.db_table, mapping) for m in models) diff --git a/apps/stats/tasks.py b/apps/stats/tasks.py index 4fd26a5fdd..0b289316ce 100644 --- a/apps/stats/tasks.py +++ b/apps/stats/tasks.py @@ -289,8 +289,8 @@ def index_update_counts(ids, **kw): key = '%s-%s' % (update.addon_id, update.date) data = search.extract_update_count(update) for index in indices: - UpdateCount.index(data, bulk=True, id=key, index=index) - es.flush_bulk(forced=True) + UpdateCount.index(data, refresh=False, id=key, index=index) + es.indices.refresh(index) except Exception, exc: index_update_counts.retry(args=[ids], exc=exc, **kw) raise @@ -310,9 +310,9 @@ def index_download_counts(ids, **kw): key = '%s-%s' % (dl.addon_id, dl.date) data = search.extract_download_count(dl) for index in indices: - DownloadCount.index(data, bulk=True, id=key, index=index) + DownloadCount.index(data, refresh=False, id=key, index=index) - es.flush_bulk(forced=True) + es.indices.refresh(index) except Exception, exc: index_download_counts.retry(args=[ids], exc=exc) raise @@ -339,8 +339,8 @@ def index_collection_counts(ids, **kw): AddonCollectionCount.objects.filter(**filters), CollectionStats.objects.filter(**filters)) for index in indices: - CollectionCount.index(data, bulk=True, id=key, index=index) - es.flush_bulk(forced=True) + CollectionCount.index(data, refresh=False, id=key, index=index) + es.indices.refresh(index) except Exception, exc: index_collection_counts.retry(args=[ids], exc=exc) raise @@ -362,8 +362,8 @@ def index_theme_user_counts(ids, **kw): key = '%s-%s' % (user_count.addon_id, user_count.date) data = search.extract_theme_user_count(user_count) for index in indices: - ThemeUserCount.index(data, bulk=True, id=key, index=index) - es.flush_bulk(forced=True) + ThemeUserCount.index(data, refresh=False, id=key, index=index) + es.indices.refresh(index) except Exception, exc: index_theme_user_counts.retry(args=[ids], exc=exc) raise diff --git a/apps/stats/tests/test_cron.py b/apps/stats/tests/test_cron.py index face4c93fd..9373c52f76 100644 --- a/apps/stats/tests/test_cron.py +++ b/apps/stats/tests/test_cron.py @@ -162,7 +162,7 @@ class TestIndexLatest(amo.tests.ESTestCase): def test_index_latest(self): latest = datetime.date.today() - datetime.timedelta(days=5) UpdateCount.index({'date': latest}) - self.refresh('update_counts') + self.refresh('stats') start = latest.strftime('%Y-%m-%d') finish = datetime.date.today().strftime('%Y-%m-%d') diff --git a/apps/stats/tests/test_views.py b/apps/stats/tests/test_views.py index 031e6ce769..fa44bd94f4 100644 --- a/apps/stats/tests/test_views.py +++ b/apps/stats/tests/test_views.py @@ -87,7 +87,7 @@ class ESStatsTest(StatsTest, amo.tests.ESTestCase): tasks.index_download_counts(list(downloads)) user_counts = ThemeUserCount.objects.values_list('id', flat=True) tasks.index_theme_user_counts(list(user_counts)) - self.refresh('update_counts') + self.refresh('stats') class TestSeriesSecurity(StatsTest): @@ -849,7 +849,7 @@ class TestCollections(amo.tests.ESTestCase): 'votes_down': x, 'downloads': x})} CollectionCount.index(data, id='%s-%s' % (x, self.collection.pk)) - self.refresh('stats_collections_counts') + self.refresh('stats') def tests_collection_anon(self): res = self.client.get(self.url) diff --git a/apps/stats/views.py b/apps/stats/views.py index 939c69dba1..e616649753 100644 --- a/apps/stats/views.py +++ b/apps/stats/views.py @@ -18,6 +18,7 @@ from django.utils.cache import add_never_cache_headers, patch_cache_control from django.utils.datastructures import SortedDict from cache_nuggets.lib import memoize +from dateutil.parser import parse from product_details import product_details import amo @@ -56,7 +57,7 @@ def dashboard(request): 'stats_base_url': stats_base_url}) -def get_series(model, extra_field=None, **filters): +def get_series(model, extra_field=None, source=None, **filters): """ Get a generator of dicts for the stats model given by the filters. @@ -67,13 +68,17 @@ def get_series(model, extra_field=None, **filters): extra = () if extra_field is None else (extra_field,) # Put a slice on it so we get more than 10 (the default), but limit to 365. qs = (model.search().order_by('-date').filter(**filters) - .values_dict('date', 'count', *extra))[:365] - for val in qs: + .values_dict('date', 'count', *extra)) + if source: + qs = qs.source(source) + for val in qs[:365]: # Convert the datetimes to a date. - date_ = date(*val['date'][0].timetuple()[:3]) + date_ = parse(val['date'][0]).date() rv = dict(count=val['count'][0], date=date_, end=date_) if extra_field: rv['data'] = extract(val[extra_field]) + if source: + rv['data'] = extract(val[source]) yield rv @@ -200,7 +205,7 @@ def sources_series(request, addon, group, start, end, format): date_range = check_series_params_or_404(group, start, end, format) check_stats_permission(request, addon) - series = get_series(DownloadCount, extra_field='_source.sources', + series = get_series(DownloadCount, source='sources', addon=addon.id, date__range=date_range) if format == 'csv': @@ -235,13 +240,13 @@ def usage_breakdown_series(request, addon, group, check_stats_permission(request, addon) fields = { - 'applications': '_source.apps', - 'locales': '_source.locales', - 'oses': '_source.os', - 'versions': '_source.versions', - 'statuses': '_source.status', + 'applications': 'apps', + 'locales': 'locales', + 'oses': 'os', + 'versions': 'versions', + 'statuses': 'status', } - series = get_series(UpdateCount, extra_field=fields[field], + series = get_series(UpdateCount, source=fields[field], addon=addon.id, date__range=date_range) if field == 'locales': series = process_locales(series) @@ -624,7 +629,7 @@ def _collection_query(request, collection, start=None, end=None): .values_dict())[:365] series = [] for val in qs: - date_ = date(*val['date'].timetuple()[:3]) + date_ = parse(val['date']).date() series.append(dict(count=val['count'], date=date_, end=date_, data=extract(val['data']))) return series diff --git a/apps/users/cron.py b/apps/users/cron.py index f9180c6312..46584cb778 100644 --- a/apps/users/cron.py +++ b/apps/users/cron.py @@ -47,7 +47,7 @@ def update_user_ratings(): @cronjobs.register -def reindex_users(index=None, aliased=True): +def reindex_users(index=None): from . import tasks ids = UserProfile.objects.values_list('id', flat=True) taskset = [tasks.index_users.subtask(args=[chunk], kwargs=dict(index=index)) diff --git a/docs/topics/install-olympia/elasticsearch.rst b/docs/topics/install-olympia/elasticsearch.rst index d554d76eea..ba6df7035e 100644 --- a/docs/topics/install-olympia/elasticsearch.rst +++ b/docs/topics/install-olympia/elasticsearch.rst @@ -10,7 +10,8 @@ the most relevant hits. Also check out `elasticsearch-head `_, a plugin with web front-end to elasticsearch that can be easier than talking to -elasticsearch over curl. +elasticsearch over curl, or `Marvel `_, +which includes a query editors with autocompletion. Installation ------------ @@ -21,41 +22,10 @@ Elasticsearch comes with most package managers.:: If Elasticsearch isn't packaged for your system, you can install it manually, `here are some good instructions on how to do so -`_. +`_. -For running Olympia you must install the -`ICU Analysis Plugin `_. -See the `ICU Github Page `_ -for instructions on installing this plugin. - -On an Ubuntu box, this would mean running:: - - sudo /usr/share/elasticsearch/bin/plugin -install elasticsearch/elasticsearch-analysis-icu/1.13.0 - -Settings --------- - -.. literalinclude:: /../scripts/elasticsearch/elasticsearch.yml - -We use a custom analyzer for indexing add-on names since they're a little -different from normal text. - -To get the same results as our servers, configure Elasticsearch by copying the -:src:`scripts/elasticsearch/elasticsearch.yml` (available in the -``scripts/elasticsearch/`` folder of your install) to your system: - -* If on OS X, copy that file into - ``/usr/local/Cellar/elasticsearch/*/config/``. -* On Linux, the directory is ``/etc/elasticsearch/``. - -.. note:: - - If you are on a linux box, make sure to comment out the 4 lines relevant to - the path configuration, unless it corresponds to an existing - ``/usr/local/var`` folder and you want it to be stored there. - -If you don't do this your results will be slightly different, but you probably -won't notice. +On Ubuntu, you should just download and install a .deb from the +`download page `_. Launching and Setting Up ------------------------ @@ -63,6 +33,9 @@ Launching and Setting Up Launch the Elasticsearch service. If you used homebrew, ``brew info elasticsearch`` will show you the commands to launch. If you used aptitude, Elasticsearch will come with a start-stop daemon in /etc/init.d. +On Ubuntu, if you have installed from a .deb, you can type: + + sudo service elasticsearch start Olympia has commands that sets up mappings and indexes objects such as add-ons and apps for you. Setting up the mappings is analagous to defining the @@ -102,8 +75,9 @@ maintained incrementally through post_save and post_delete hooks:: Querying Elasticsearch in Django -------------------------------- -We use `elasticutils `_, a Python -library that gives us a search API to elasticsearch. +For now, we have our own query builder (which is an historical clone of +`elasticutils `_), but we will +switch to the official one very soon. We attach elasticutils to Django models with a mixin. This lets us do things like ``.search()`` which returns an object which acts a lot like Django's ORM's diff --git a/lib/es/management/commands/reindex.py b/lib/es/management/commands/reindex.py index b6f8531b76..9394ab5e4f 100644 --- a/lib/es/management/commands/reindex.py +++ b/lib/es/management/commands/reindex.py @@ -1,88 +1,26 @@ import json import logging import os -import re import sys import time -import traceback from optparse import make_option -import requests from celery_tasktree import task_with_callbacks, TaskTree -from django.conf import settings as django_settings -from django.core.management import call_command +import elasticsearch + +from django.conf import settings from django.core.management.base import BaseCommand, CommandError -from addons.cron import reindex_addons -from amo.utils import timestamp_index -from apps.addons.search import setup_mapping as put_amo_mapping -from bandwagon.cron import reindex_collections -from compat.cron import compatibility_report +from amo.search import get_es +from apps.addons import search as addons_search +from apps.stats import search as stats_search from lib.es.utils import (is_reindexing_amo, unflag_reindexing_amo, - flag_reindexing_amo) -from stats.search import setup_indexes as put_stats_mapping -from users.cron import reindex_users - - -_INDEXES = {} -_ALIASES = django_settings.ES_INDEXES.copy() -# Remove stats indexes. They may be added later via the --with-stats option. -_STATS_ALIASES = {} -for k, v in _ALIASES.items(): - if 'stats' in v: - _ALIASES.pop(k) - _STATS_ALIASES[k] = v - - -def index_stats(index=None, aliased=True): - """Indexes the previous 365 days.""" - call_command('index_stats', addons=None) - - -_INDEXES = {'stats': [index_stats], - 'apps': [reindex_addons, - reindex_collections, - reindex_users, - compatibility_report]} + flag_reindexing_amo, timestamp_index) logger = logging.getLogger('z.elasticsearch') -DEFAULT_NUM_REPLICAS = 0 -DEFAULT_NUM_SHARDS = 3 -if hasattr(django_settings, 'ES_URLS'): - base_url = django_settings.ES_URLS[0] -else: - base_url = 'http://127.0.0.1:9200' - - -def url(path): - return '%s%s' % (base_url, path) - - -def _action(name, **kw): - return {name: kw} - - -def call_es(path, *args, **kw): - method = kw.pop('method', 'GET') - status = kw.pop('status', 200) - if isinstance(status, int): - status = [status] - - if not path.startswith('/'): - path = '/' + path - - method = getattr(requests, method.lower()) - res = method(url(path), *args, **kw) - - if res.status_code not in status: - error = CommandError('Call on %r failed.\n%s' % (path, res.content)) - error.content = res.content - error.json = res.json() - raise error - - return res +ES = get_es() def log(msg, stdout=sys.stdout): @@ -91,116 +29,45 @@ def log(msg, stdout=sys.stdout): @task_with_callbacks def delete_indexes(indexes, stdout=sys.stdout): - """Removes the indexes. - - - indexes: list of indexes names to remove. - """ - # now call the server - can we do this with a single call? - for index in indexes: - log('Removing index %r' % index, stdout=stdout) - call_es(index, method='DELETE') + indices = ','.join(indexes) + log('Removing indices %r' % indices, stdout=stdout) + ES.indices.delete(indices, ignore=[404, 500]) @task_with_callbacks -def run_aliases_actions(actions, stdout=sys.stdout): - """Run actions on aliases. - - - action: list of action/index/alias items - """ - # we also want to rename or delete the current index in case we have one - dump = [] - aliases = [] - - for action, index, alias in actions: - dump.append({action: {'index': index, 'alias': alias}}) - - if action == 'add': - aliases.append(alias) - - post_data = json.dumps({'actions': dump}) - - # now call the server - log('Rebuilding aliases with actions: %s' % dump, stdout=stdout) - try: - call_es('_aliases', post_data, method='POST') - except CommandError, e: - log('Initial command error: %s' % e, stdout=stdout) - # XXX Did not find a better way to extract the info - error = e.json['error'] - res = re.search('(Invalid alias name \[)(?P.*?)(\])', error) - if res is None: - raise - - index = res.groupdict()['index'] - log('Removing index %r' % index, stdout=stdout) - call_es(index, method='DELETE') - - # Now trying again - log('Trying again to rebuild the aliases', stdout=stdout) - call_es('_aliases', post_data, method='POST') +def update_aliases(actions, stdout=sys.stdout): + log('Rebuilding aliases with actions: %s' % actions, stdout=stdout) + ES.indices.update_aliases({'actions': actions}, ignore=404) @task_with_callbacks -def create_mapping(new_index, alias, num_replicas=DEFAULT_NUM_REPLICAS, - num_shards=DEFAULT_NUM_SHARDS, stdout=sys.stdout): - """Creates a mapping for the new index. - - - new_index: new index name. - - alias: alias name - - num_replicas: number of replicas in ES - - num_shards: number of shards in ES - """ - log('Create the mapping for index %r, alias: %r' % (new_index, alias), +def create_new_index(module, new_index, stdout=sys.stdout): + alias = module.get_alias() + log('Create the index {0}, for alias: {1}'.format(new_index, alias), stdout=stdout) - if requests.head(url('/' + alias)).status_code == 200: - res = call_es('%s/_settings' % (alias)).json() + config = {} + + # Retrieve settings from last index, if any + if ES.indices.exists(alias): + res = ES.indices.get_settings(alias) idx_settings = res.get(alias, {}).get('settings', {}) - else: - idx_settings = {} + config['number_of_replicas'] = idx_settings.get( + 'number_of_replicas', + settings.ES_DEFAULT_NUM_REPLICAS + ) + config['number_of_shards'] = idx_settings.get( + 'number_of_shards', + settings.ES_DEFAULT_NUM_SHARDS + ) - settings = { - 'number_of_replicas': idx_settings.get('number_of_replicas', - num_replicas), - 'number_of_shards': idx_settings.get('number_of_shards', - num_shards) - } - - # Create mapping without aliases since we do it manually - if not 'stats' in alias: - put_amo_mapping(new_index, aliased=False) - else: - put_stats_mapping(new_index, aliased=False) - - # Create new index - index_url = url('/%s' % new_index) - - # if the index already exists we can keep it - if requests.head(index_url).status_code == 200: - return - - call_es(index_url, json.dumps(settings), method='PUT', - status=(200, 201)) + module.create_new_index(new_index, config) @task_with_callbacks -def create_index(index, is_stats, stdout=sys.stdout): - """Create the index. - - - index: name of the index - - is_stats: if True, we're indexing stats - """ - log('Running all indexes for %r' % index, stdout=stdout) - indexers = is_stats and _INDEXES['stats'] or _INDEXES['apps'] - - for indexer in indexers: - log('Indexing %r' % indexer.__name__, stdout=stdout) - try: - indexer(index, aliased=False) - except Exception: - # We want to log this event but continue - log('Indexer %r failed' % indexer.__name__, stdout=stdout) - traceback.print_exc() +def index_data(module, index, stdout=sys.stdout): + log('Reindexing {0}'.format(index), stdout=stdout) + module.reindex(index) @task_with_callbacks @@ -232,9 +99,6 @@ Current Aliases configuration: class Command(BaseCommand): help = 'Reindex all ES indexes' option_list = BaseCommand.option_list + ( - make_option('--prefix', action='store', - help='Indexes prefixes, like test_', - default=''), make_option('--force', action='store_true', help=('Bypass the database flag that says ' 'another indexation is ongoing'), @@ -261,12 +125,11 @@ class Command(BaseCommand): raise CommandError('Indexation already occuring - use --force to ' 'bypass') - prefix = kwargs.get('prefix', '') log('Starting the reindexation', stdout=self.stdout) + modules = [addons_search] if kwargs.get('with_stats', False): - # Add the stats indexes back. - _ALIASES.update(_STATS_ALIASES) + modules.append(stats_search) if kwargs.get('wipe', False): confirm = raw_input('Are you sure you want to wipe all AMO ' @@ -277,28 +140,20 @@ class Command(BaseCommand): if confirm == 'yes': unflag_database(stdout=self.stdout) - for index in set(_ALIASES.values()): - requests.delete(url('/%s') % index) + for index in set(m.get_alias() for m in modules): + ES.indices.delete(index) else: raise CommandError("Aborted.") elif force: unflag_database(stdout=self.stdout) - # Get list current aliases at /_aliases. - all_aliases = requests.get(url('/_aliases')).json() + alias_actions = [] - # building the list of indexes - indexes = set([prefix + index for index in - _ALIASES.values()]) - - actions = [] - - def add_action(*elmt): - if elmt in actions: + def add_alias_action(action, index, alias): + action = {action: {'index': index, 'alias': alias}} + if action in alias_actions: return - actions.append(elmt) - - all_aliases = all_aliases.items() + alias_actions.append(action) # creating a task tree log('Building the task tree', stdout=self.stdout) @@ -308,46 +163,46 @@ class Command(BaseCommand): to_remove = [] # for each index, we create a new time-stamped index - for alias in indexes: - is_stats = 'stats' in alias + for module in modules: old_index = None + alias = module.get_alias() - for aliased_index, alias_ in all_aliases: - if alias in alias_['aliases'].keys(): + try: + olds = ES.indices.get_alias(alias) + except elasticsearch.TransportError: + pass + else: + for old_index in olds.keys(): # mark the index to be removed later - old_index = aliased_index - to_remove.append(aliased_index) - - # mark the alias to be removed as well - add_action('remove', aliased_index, alias) + to_remove.append(old_index) + add_alias_action('remove', old_index, alias) # create a new index, using the alias name with a timestamp new_index = timestamp_index(alias) # if old_index is None that could mean it's a full index # In that case we want to continue index in it - future_alias = url('/%s' % alias) - if requests.head(future_alias).status_code == 200: + if ES.indices.exists(alias): old_index = alias # flag the database step1 = tree.add_task(flag_database, args=[new_index, old_index, alias], kwargs={'stdout': self.stdout}) - step2 = step1.add_task(create_mapping, - args=[new_index, alias], + step2 = step1.add_task(create_new_index, + args=[module, new_index], kwargs={'stdout': self.stdout}) - step3 = step2.add_task(create_index, - args=[new_index, is_stats], + step3 = step2.add_task(index_data, + args=[module, new_index], kwargs={'stdout': self.stdout}) last_action = step3 # adding new index to the alias - add_action('add', new_index, alias) + add_alias_action('add', new_index, alias) # Alias the new index and remove the old aliases, if any. - renaming_step = last_action.add_task(run_aliases_actions, - args=[actions], + renaming_step = last_action.add_task(update_aliases, + args=[alias_actions], kwargs={'stdout': self.stdout}) # unflag the database - there's no need to duplicate the @@ -356,8 +211,9 @@ class Command(BaseCommand): kwargs={'stdout': self.stdout}) # Delete the old indexes, if any - delete.add_task(delete_indexes, - args=[to_remove], kwargs={'stdout': self.stdout}) + if to_remove: + delete.add_task(delete_indexes, + args=[to_remove], kwargs={'stdout': self.stdout}) # let's do it log('Running all indexation tasks', stdout=self.stdout) @@ -365,7 +221,8 @@ class Command(BaseCommand): os.environ['FORCE_INDEXING'] = '1' try: tree.apply_async() - time.sleep(10) # give celeryd some time to flag the DB + if not getattr(settings, 'CELERY_ALWAYS_EAGER', False): + time.sleep(10) # give celeryd some time to flag the DB while is_reindexing_amo(): sys.stdout.write('.') sys.stdout.flush() @@ -376,7 +233,7 @@ class Command(BaseCommand): sys.stdout.write('\n') # let's return the /_aliases values - aliases = call_es('_aliases').json() + aliases = ES.indices.get_aliases() aliases = json.dumps(aliases, sort_keys=True, indent=4) - summary = _SUMMARY % (len(indexes), aliases) + summary = _SUMMARY % (len(modules), aliases) log(summary, stdout=self.stdout) diff --git a/lib/es/tests/test_commands.py b/lib/es/tests/test_commands.py index 17573e983b..d782a4e38b 100644 --- a/lib/es/tests/test_commands.py +++ b/lib/es/tests/test_commands.py @@ -12,9 +12,10 @@ import amo.search import amo.tests from amo.urlresolvers import reverse from amo.utils import urlparams -from es.management.commands.reindex import call_es from lib.es.utils import is_reindexing_amo, unflag_reindexing_amo +ES = amo.search.get_es() + class TestIndexCommand(amo.tests.ESTestCase): @@ -26,13 +27,13 @@ class TestIndexCommand(amo.tests.ESTestCase): self.url = reverse('search.search') # Any index created during the test will be deleted. - self.indices = call_es('_status').json()['indices'].keys() + self.indices = ES.indices.status()['indices'].keys() def tearDown(self): - current_indices = call_es('_status').json()['indices'].keys() + current_indices = ES.indices.status()['indices'].keys() for index in current_indices: if index not in self.indices: - call_es(index, method='DELETE') + ES.indices.delete(index, ignore=404) def check_results(self, expected): """Make sure the expected addons are listed in a standard search.""" @@ -55,7 +56,7 @@ class TestIndexCommand(amo.tests.ESTestCase): def get_indices_aliases(self): """Return the test indices with an alias.""" - indices = call_es('_aliases').json() + indices = ES.indices.get_aliases() items = [(index, aliases['aliases'].keys()[0]) for index, aliases in indices.items() if len(aliases['aliases']) > 0 and index.startswith('test')] @@ -102,7 +103,7 @@ class TestIndexCommand(amo.tests.ESTestCase): wanted.append(amo.tests.addon_factory()) connection._commit() connection.clean_savepoints() - amo.search.get_es().refresh() + self.refresh() self.check_results(wanted) if len(wanted) == old_addons_count: @@ -117,7 +118,7 @@ class TestIndexCommand(amo.tests.ESTestCase): # The reindexation is done, let's double check we have all our docs. connection._commit() connection.clean_savepoints() - amo.search.get_es().refresh() + self.refresh() self.check_results(wanted) # New indices have been created, and aliases now point to them. diff --git a/lib/es/utils.py b/lib/es/utils.py index 16c4651b5c..b107944ac0 100644 --- a/lib/es/utils.py +++ b/lib/es/utils.py @@ -1,9 +1,17 @@ import os +import datetime +import logging + +from django.core.management.base import CommandError +from django.conf import settings + +from elasticsearch import helpers import amo.search -from .models import Reindexing -from django.core.management.base import CommandError +from .models import Reindexing + +log = logging.getLogger('z.es') # shortcut functions is_reindexing_amo = Reindexing.objects.is_reindexing_amo @@ -25,13 +33,19 @@ def index_objects(ids, model, search, index=None, transforms=None): for t in transforms: qs = qs.transform(t) + bulk = [] for ob in qs: data = search.extract(ob) - for index in indices: - model.index(data, bulk=True, id=ob.id, index=index) + bulk.append({ + "_source": data, + "_id": ob.id, + "_type": ob.get_mapping_type(), + "_index": index + }) - amo.search.get_es().flush_bulk(forced=True) + es = amo.search.get_es() + return helpers.bulk(es, bulk) def raise_if_reindex_in_progress(site): @@ -44,3 +58,36 @@ def raise_if_reindex_in_progress(site): if already_reindexing and 'FORCE_INDEXING' not in os.environ: raise CommandError("Indexation already occuring. Add a FORCE_INDEXING " "variable in the environ to force it") + + +def timestamp_index(index): + """Returns index-YYYYMMDDHHMMSS with the current time.""" + return '%s-%s' % (index, datetime.datetime.now().strftime('%Y%m%d%H%M%S')) + + +def create_index(index, config=None): + """Creates an index if it's not present. + + Returns the index name. + + Options: + + - index: name of the index. + - config: if provided, used as the settings option for the + ES calls. + """ + es = amo.search.get_es() + + if settings.IN_TEST_SUITE: + if not config: + config = {} + # Be nice to ES running on ci.mozilla.org + config.update({ + 'number_of_shards': 3, + 'number_of_replicas': 0 + }) + + if not es.indices.exists(index): + es.indices.create(index, body=config, ignore=400) + + return index diff --git a/lib/log_settings_base.py b/lib/log_settings_base.py index 38d93d4a49..faf2de12b7 100644 --- a/lib/log_settings_base.py +++ b/lib/log_settings_base.py @@ -80,7 +80,7 @@ loggers = { 'newrelic': { 'level': 'WARNING', }, - 'elasticutils': { + 'elasticsearch': { 'level': 'WARNING', }, 'suds': { diff --git a/lib/settings_base.py b/lib/settings_base.py index 8166307ca0..2e2411d27e 100644 --- a/lib/settings_base.py +++ b/lib/settings_base.py @@ -1097,7 +1097,7 @@ LOGGING = { 'amqplib': {'handlers': ['null']}, 'caching.invalidation': {'handlers': ['null']}, 'caching': {'level': logging.WARNING}, - 'pyes': {'handlers': ['null']}, + 'elasticsearch': {'handlers': ['null']}, 'rdflib': {'handlers': ['null']}, 'suds': {'handlers': ['null']}, 'z.task': {'level': logging.INFO}, @@ -1277,12 +1277,11 @@ BUILDER_VERSIONS_URL = ('https://builder.addons.mozilla.org/repackage/' + ## elasticsearch ES_HOSTS = ['127.0.0.1:9200'] ES_URLS = ['http://%s' % h for h in ES_HOSTS] -ES_INDEXES = {'default': 'addons', - 'update_counts': 'addons_stats', - 'download_counts': 'addons_stats', - 'stats_contributions': 'addons_stats', - 'stats_collections_counts': 'addons_stats', - 'users_install': 'addons_stats'} +ES_INDEXES = { + 'default': 'addons', + 'stats': 'addons_stats', +} + ES_TIMEOUT = 30 ES_DEFAULT_NUM_REPLICAS = 2 ES_DEFAULT_NUM_SHARDS = 5 diff --git a/migrations/264-locale-indexes.py b/migrations/264-locale-indexes.py index 88ff402c73..939a1a25d3 100644 --- a/migrations/264-locale-indexes.py +++ b/migrations/264-locale-indexes.py @@ -1,20 +1,19 @@ -from django.conf import settings +# from django.conf import settings -import elasticutils +# import elasticutils -from addons.search import setup_mapping +# from addons.search import setup_mapping -def columns(): - es = elasticutils.get_es() - index = settings.ES_INDEXES['default'] - return es.get_mapping('addons', index)['addons']['properties'].keys() +# def columns(): +# es = elasticutils.get_es() +# index = settings.ES_INDEXES['default'] +# return es.get_mapping('addons', index)['addons']['properties'].keys() -def run(): - if 'name_finnish' not in columns(): - print 'ok' - setup_mapping() - else: - print 'skippint' - assert 'name_finnish' in columns() - +# def run(): +# if 'name_finnish' not in columns(): +# print 'ok' +# setup_mapping() +# else: +# print 'skippint' +# assert 'name_finnish' in columns() diff --git a/requirements/prod.txt b/requirements/prod.txt index 1a29ad5961..2857db3486 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -40,7 +40,7 @@ django-statsd-mozilla==0.3.10 django-storages==1.1.8 django-waffle==0.9.2 easy-thumbnails==1.4 -elasticutils==0.8.2 +elasticsearch==1.0.0 email-reply-parser==0.2.0 fastchardet==0.2.0 feedparser==5.1.3 @@ -74,8 +74,6 @@ polib==1.0.3 protobuf==2.5.0 pyasn1==0.1.7 PyBrowserID==0.6 -pyelasticsearch==0.6.1 -pyes==0.16.0 PyJWT-mozilla==0.1.4.2 PyMySQL==0.5 pymemcache==1.2 @@ -99,6 +97,7 @@ SQLAlchemy==0.7.5 statsd==2.0.3 suds==0.4 thrift==0.9.1 +urllib3==1.9 ## Not on pypi. -e git+https://github.com/mozilla/amo-validator.git@1.3.1#egg=amo-validator diff --git a/scripts/elasticsearch/elasticsearch.yml b/scripts/elasticsearch/elasticsearch.yml deleted file mode 100644 index b25aab4ed1..0000000000 --- a/scripts/elasticsearch/elasticsearch.yml +++ /dev/null @@ -1,28 +0,0 @@ -cluster: - name: wooyeah - -# Don't try to cluster with other machines during local development. -# Remove the following 3 lines to enable default clustering. -network.host: localhost -discovery.zen.ping.multicast.enabled: false -discovery.zen.ping.unicast.hosts: ["localhost"] - -path: - logs: /usr/local/var/log - data: /usr/local/var/data - plugins: /usr/local/var/lib/elasticsearch/plugins - -index: - analysis: - analyzer: - standardPlusWordDelimiter: - type: custom - tokenizer: standard - filter: [standard, wordDelim, lowercase, stop, dict] - filter: - wordDelim: - type: word_delimiter - preserve_original: true - dict: - type: dictionary_decompounder - word_list: [cool, iris, fire, bug, flag, fox, grease, monkey, flash, block, forecast, screen, grab, cookie, auto, fill, text, all, so, think, mega, upload, download, video, map, spring, fix, input, clip, fly, lang, up, down, persona, css, html, all, http, ball, firefox, bookmark, chat, zilla, edit, menu, menus, status, bar, with, easy, sync, search, google, time, window, js, super, scroll, title, close, undo, user, inspect, inspector, browser, context, dictionary, mail, button, url, password, secure, image, new, tab, delete, click, name, smart, down, manager, open, query, net, link, blog, this, color, select, key, keys, foxy, translate, word] diff --git a/scripts/elasticsearch/run_locally.py b/scripts/elasticsearch/run_locally.py deleted file mode 100755 index d02e823f7a..0000000000 --- a/scripts/elasticsearch/run_locally.py +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env python -import os - -if __name__ == '__main__': - cf = os.path.join(os.path.dirname(__file__), 'elasticsearch.yml') - cf = os.path.abspath(cf) - os.system('elasticsearch -f -D es.config=%s' % cf) diff --git a/scripts/elasticsearch/wordlist.txt b/scripts/elasticsearch/wordlist.txt deleted file mode 100644 index d89e0de502..0000000000 --- a/scripts/elasticsearch/wordlist.txt +++ /dev/null @@ -1,94 +0,0 @@ -cool -iris -fire -bug -flag -fox -grease -monkey -flash -block -forecast -screen -grab -cookie -auto -fill -text -all -so -think -mega -upload -download -video -map -spring -fix -input -clip -fly -lang -up -down -persona -css -html -all -http -ball -firefox -bookmark -chat -zilla -edit -menu -menus -status -bar -with -easy -sync -search -google -time -window -js -super -scroll -title -close -undo -user -inspect -inspector -browser -context -dictionary -mail -button -url -password -secure -image -new -tab -delete -click -name -smart -down -manager -open -query -net -link -blog -this -color -select -key -keys -foxy -translate -word