Merge pull request #493 from tarekziade/808505-check-indexer-flag
Block indexing cron while a reindexing is going on (Bug 808505)
This commit is contained in:
Коммит
1694b826b2
|
@ -24,6 +24,7 @@ from amo.utils import chunked
|
||||||
from addons import search
|
from addons import search
|
||||||
from addons.models import Addon, FrozenAddon, AppSupport
|
from addons.models import Addon, FrozenAddon, AppSupport
|
||||||
from files.models import File
|
from files.models import File
|
||||||
|
from lib.es.utils import raise_if_reindex_in_progress
|
||||||
from stats.models import UpdateCount
|
from stats.models import UpdateCount
|
||||||
|
|
||||||
log = logging.getLogger('z.cron')
|
log = logging.getLogger('z.cron')
|
||||||
|
@ -94,6 +95,7 @@ def _update_addons_current_version(data, **kw):
|
||||||
@cronjobs.register
|
@cronjobs.register
|
||||||
def update_addon_average_daily_users():
|
def update_addon_average_daily_users():
|
||||||
"""Update add-ons ADU totals."""
|
"""Update add-ons ADU totals."""
|
||||||
|
raise_if_reindex_in_progress()
|
||||||
cursor = connections[multidb.get_slave()].cursor()
|
cursor = connections[multidb.get_slave()].cursor()
|
||||||
q = """SELECT
|
q = """SELECT
|
||||||
addon_id, AVG(`count`)
|
addon_id, AVG(`count`)
|
||||||
|
|
|
@ -1,3 +1,6 @@
|
||||||
|
import os
|
||||||
|
import datetime
|
||||||
|
|
||||||
from nose.tools import eq_
|
from nose.tools import eq_
|
||||||
import mock
|
import mock
|
||||||
|
|
||||||
|
@ -5,7 +8,10 @@ import amo
|
||||||
import amo.tests
|
import amo.tests
|
||||||
from addons import cron
|
from addons import cron
|
||||||
from addons.models import Addon, AppSupport
|
from addons.models import Addon, AppSupport
|
||||||
|
from django.core.management.base import CommandError
|
||||||
from files.models import File, Platform
|
from files.models import File, Platform
|
||||||
|
from lib.es.management.commands.reindex import flag_database, unflag_database
|
||||||
|
from stats.models import UpdateCount
|
||||||
from versions.models import Version
|
from versions.models import Version
|
||||||
|
|
||||||
|
|
||||||
|
@ -211,6 +217,35 @@ class AvgDailyUserCountTestCase(amo.tests.TestCase):
|
||||||
addon = Addon.objects.get(pk=3615)
|
addon = Addon.objects.get(pk=3615)
|
||||||
eq_(addon.average_daily_users, addon.total_downloads)
|
eq_(addon.average_daily_users, addon.total_downloads)
|
||||||
|
|
||||||
|
def test_adu_flag(self):
|
||||||
|
addon = Addon.objects.get(pk=3615)
|
||||||
|
|
||||||
|
now = datetime.datetime.now()
|
||||||
|
counter = UpdateCount.objects.create(addon=addon, date=now,
|
||||||
|
count=1234)
|
||||||
|
counter.save()
|
||||||
|
|
||||||
|
self.assertTrue(
|
||||||
|
addon.average_daily_users > addon.total_downloads + 10000,
|
||||||
|
'Unexpected ADU count. ADU of %d not greater than %d' % (
|
||||||
|
addon.average_daily_users, addon.total_downloads + 10000))
|
||||||
|
|
||||||
|
adu = cron.update_addon_average_daily_users
|
||||||
|
flag_database('new', 'old', 'alias')
|
||||||
|
try:
|
||||||
|
# Should fail.
|
||||||
|
self.assertRaises(CommandError, adu)
|
||||||
|
|
||||||
|
# Should work with the environ flag.
|
||||||
|
os.environ['FORCE_INDEXING'] = '1'
|
||||||
|
adu()
|
||||||
|
finally:
|
||||||
|
unflag_database()
|
||||||
|
del os.environ['FORCE_INDEXING']
|
||||||
|
|
||||||
|
addon = Addon.objects.get(pk=3615)
|
||||||
|
eq_(addon.average_daily_users, 1234)
|
||||||
|
|
||||||
|
|
||||||
class TestReindex(amo.tests.ESTestCase):
|
class TestReindex(amo.tests.ESTestCase):
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@ from bandwagon.models import Collection
|
||||||
from cake.models import Session
|
from cake.models import Session
|
||||||
from constants.base import VALID_STATUSES
|
from constants.base import VALID_STATUSES
|
||||||
from devhub.models import ActivityLog, LegacyAddonLog
|
from devhub.models import ActivityLog, LegacyAddonLog
|
||||||
|
from lib.es.utils import raise_if_reindex_in_progress
|
||||||
from sharing import SERVICES_LIST, LOCAL_SERVICES_LIST
|
from sharing import SERVICES_LIST, LOCAL_SERVICES_LIST
|
||||||
from stats.models import AddonShareCount, Contribution
|
from stats.models import AddonShareCount, Contribution
|
||||||
|
|
||||||
|
@ -244,6 +245,7 @@ def weekly_downloads():
|
||||||
"""
|
"""
|
||||||
Update 7-day add-on download counts.
|
Update 7-day add-on download counts.
|
||||||
"""
|
"""
|
||||||
|
raise_if_reindex_in_progress()
|
||||||
cursor = connection.cursor()
|
cursor = connection.cursor()
|
||||||
cursor.execute("""
|
cursor.execute("""
|
||||||
SELECT addon_id, SUM(count) AS weekly_count
|
SELECT addon_id, SUM(count) AS weekly_count
|
||||||
|
|
|
@ -12,6 +12,7 @@ from addons.models import Addon
|
||||||
from .models import (AddonCollectionCount, CollectionCount,
|
from .models import (AddonCollectionCount, CollectionCount,
|
||||||
UpdateCount)
|
UpdateCount)
|
||||||
from . import tasks
|
from . import tasks
|
||||||
|
from lib.es.utils import raise_if_reindex_in_progress
|
||||||
|
|
||||||
task_log = commonware.log.getLogger('z.task')
|
task_log = commonware.log.getLogger('z.task')
|
||||||
cron_log = commonware.log.getLogger('z.cron')
|
cron_log = commonware.log.getLogger('z.cron')
|
||||||
|
@ -20,6 +21,7 @@ cron_log = commonware.log.getLogger('z.cron')
|
||||||
@cronjobs.register
|
@cronjobs.register
|
||||||
def update_addons_collections_downloads():
|
def update_addons_collections_downloads():
|
||||||
"""Update addons+collections download totals."""
|
"""Update addons+collections download totals."""
|
||||||
|
raise_if_reindex_in_progress()
|
||||||
|
|
||||||
d = (AddonCollectionCount.objects.values('addon', 'collection')
|
d = (AddonCollectionCount.objects.values('addon', 'collection')
|
||||||
.annotate(sum=Sum('count')))
|
.annotate(sum=Sum('count')))
|
||||||
|
@ -44,6 +46,7 @@ def update_collections_total():
|
||||||
@cronjobs.register
|
@cronjobs.register
|
||||||
def update_global_totals(date=None):
|
def update_global_totals(date=None):
|
||||||
"""Update global statistics totals."""
|
"""Update global statistics totals."""
|
||||||
|
raise_if_reindex_in_progress()
|
||||||
|
|
||||||
if date:
|
if date:
|
||||||
date = datetime.datetime.strptime(date, '%Y-%m-%d').date()
|
date = datetime.datetime.strptime(date, '%Y-%m-%d').date()
|
||||||
|
@ -70,6 +73,7 @@ def addon_total_contributions():
|
||||||
|
|
||||||
@cronjobs.register
|
@cronjobs.register
|
||||||
def index_latest_stats(index=None, aliased=True):
|
def index_latest_stats(index=None, aliased=True):
|
||||||
|
raise_if_reindex_in_progress()
|
||||||
latest = UpdateCount.search(index).order_by('-date').values_dict()
|
latest = UpdateCount.search(index).order_by('-date').values_dict()
|
||||||
if latest:
|
if latest:
|
||||||
latest = latest[0]['date']
|
latest = latest[0]['date']
|
||||||
|
|
|
@ -2,6 +2,7 @@ import datetime
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
from optparse import make_option
|
from optparse import make_option
|
||||||
|
import os
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
import traceback
|
import traceback
|
||||||
|
@ -25,6 +26,7 @@ from stats.search import setup_indexes as put_stats_mapping
|
||||||
from users.cron import reindex_users
|
from users.cron import reindex_users
|
||||||
|
|
||||||
from lib.es.models import Reindexing
|
from lib.es.models import Reindexing
|
||||||
|
from lib.es.utils import database_flagged
|
||||||
|
|
||||||
if django_settings.MARKETPLACE:
|
if django_settings.MARKETPLACE:
|
||||||
from mkt.stats.cron import index_mkt_stats
|
from mkt.stats.cron import index_mkt_stats
|
||||||
|
@ -214,11 +216,6 @@ def create_index(index, is_stats):
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|
||||||
|
|
||||||
def database_flagged():
|
|
||||||
"""Returns True if the Database is being indexed"""
|
|
||||||
return Reindexing.objects.exists()
|
|
||||||
|
|
||||||
|
|
||||||
@task_with_callbacks
|
@task_with_callbacks
|
||||||
def flag_database(new_index, old_index, alias):
|
def flag_database(new_index, old_index, alias):
|
||||||
"""Flags the database to indicate that the reindexing has started."""
|
"""Flags the database to indicate that the reindexing has started."""
|
||||||
|
@ -363,13 +360,17 @@ class Command(BaseCommand):
|
||||||
|
|
||||||
# let's do it
|
# let's do it
|
||||||
log('Running all indexation tasks')
|
log('Running all indexation tasks')
|
||||||
tree.apply_async()
|
|
||||||
|
|
||||||
|
os.environ['FORCE_INDEXING'] = '1'
|
||||||
|
try:
|
||||||
|
tree.apply_async()
|
||||||
time.sleep(10) # give celeryd some time to flag the DB
|
time.sleep(10) # give celeryd some time to flag the DB
|
||||||
while database_flagged():
|
while database_flagged():
|
||||||
sys.stdout.write('.')
|
sys.stdout.write('.')
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
finally:
|
||||||
|
del os.environ['FORCE_INDEXING']
|
||||||
|
|
||||||
sys.stdout.write('\n')
|
sys.stdout.write('\n')
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,7 @@
|
||||||
|
import os
|
||||||
|
|
||||||
from .models import Reindexing
|
from .models import Reindexing
|
||||||
|
from django.core.management.base import CommandError
|
||||||
import elasticutils.contrib.django as elasticutils
|
import elasticutils.contrib.django as elasticutils
|
||||||
|
|
||||||
|
|
||||||
|
@ -34,3 +37,19 @@ def index_objects(ids, model, search, index=None, transforms=None):
|
||||||
model.index(data, bulk=True, id=ob.id, index=index)
|
model.index(data, bulk=True, id=ob.id, index=index)
|
||||||
|
|
||||||
elasticutils.get_es().flush_bulk(forced=True)
|
elasticutils.get_es().flush_bulk(forced=True)
|
||||||
|
|
||||||
|
|
||||||
|
def database_flagged():
|
||||||
|
"""Returns True if the Database is being indexed"""
|
||||||
|
return Reindexing.objects.exists()
|
||||||
|
|
||||||
|
|
||||||
|
def raise_if_reindex_in_progress():
|
||||||
|
"""Checks if the database indexation flag is on.
|
||||||
|
|
||||||
|
If it's one, and if no "FORCE_INDEXING" variable is present in the env,
|
||||||
|
raises a CommandError.
|
||||||
|
"""
|
||||||
|
if database_flagged() and 'FORCE_INDEXING' not in os.environ:
|
||||||
|
raise CommandError("Indexation already occuring. Add a FORCE_INDEXING "
|
||||||
|
"variable in the environ to force it")
|
||||||
|
|
|
@ -7,6 +7,7 @@ import cronjobs
|
||||||
import pyes
|
import pyes
|
||||||
|
|
||||||
from stats.models import Contribution
|
from stats.models import Contribution
|
||||||
|
from lib.es.utils import raise_if_reindex_in_progress
|
||||||
from mkt.webapps.models import Installed
|
from mkt.webapps.models import Installed
|
||||||
from . import tasks
|
from . import tasks
|
||||||
|
|
||||||
|
@ -15,6 +16,7 @@ cron_log = commonware.log.getLogger('mkt.cron')
|
||||||
|
|
||||||
@cronjobs.register
|
@cronjobs.register
|
||||||
def index_latest_mkt_stats(index=None, aliased=True):
|
def index_latest_mkt_stats(index=None, aliased=True):
|
||||||
|
raise_if_reindex_in_progress()
|
||||||
yesterday = datetime.date.today() - datetime.timedelta(days=1)
|
yesterday = datetime.date.today() - datetime.timedelta(days=1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -10,6 +10,7 @@ from django.db.models import Count
|
||||||
import commonware.log
|
import commonware.log
|
||||||
import cronjobs
|
import cronjobs
|
||||||
from celery.task.sets import TaskSet
|
from celery.task.sets import TaskSet
|
||||||
|
from lib.es.utils import raise_if_reindex_in_progress
|
||||||
|
|
||||||
import amo
|
import amo
|
||||||
from amo.utils import chunked
|
from amo.utils import chunked
|
||||||
|
@ -23,6 +24,7 @@ log = commonware.log.getLogger('z.cron')
|
||||||
@cronjobs.register
|
@cronjobs.register
|
||||||
def update_weekly_downloads():
|
def update_weekly_downloads():
|
||||||
"""Update the weekly "downloads" from the users_install table."""
|
"""Update the weekly "downloads" from the users_install table."""
|
||||||
|
raise_if_reindex_in_progress()
|
||||||
interval = datetime.today() - timedelta(days=7)
|
interval = datetime.today() - timedelta(days=7)
|
||||||
counts = (Installed.objects.values('addon')
|
counts = (Installed.objects.values('addon')
|
||||||
.filter(created__gte=interval,
|
.filter(created__gte=interval,
|
||||||
|
|
|
@ -9,6 +9,8 @@ from nose.tools import eq_
|
||||||
import amo
|
import amo
|
||||||
import amo.tests
|
import amo.tests
|
||||||
from addons.models import Addon
|
from addons.models import Addon
|
||||||
|
from django.core.management.base import CommandError
|
||||||
|
from lib.es.management.commands.reindex import flag_database, unflag_database
|
||||||
from users.models import UserProfile
|
from users.models import UserProfile
|
||||||
from mkt.webapps.cron import clean_old_signed, update_weekly_downloads
|
from mkt.webapps.cron import clean_old_signed, update_weekly_downloads
|
||||||
from mkt.webapps.models import Installed, Webapp
|
from mkt.webapps.models import Installed, Webapp
|
||||||
|
@ -39,6 +41,27 @@ class TestWeeklyDownloads(amo.tests.TestCase):
|
||||||
update_weekly_downloads()
|
update_weekly_downloads()
|
||||||
eq_(self.get_webapp().weekly_downloads, 2)
|
eq_(self.get_webapp().weekly_downloads, 2)
|
||||||
|
|
||||||
|
def test_weekly_downloads_flagged(self):
|
||||||
|
eq_(self.get_webapp().weekly_downloads, 0)
|
||||||
|
self.add_install()
|
||||||
|
self.add_install(user=UserProfile.objects.get(pk=10482),
|
||||||
|
created=datetime.today() - timedelta(days=2))
|
||||||
|
|
||||||
|
flag_database('new', 'old', 'alias')
|
||||||
|
try:
|
||||||
|
# Should fail.
|
||||||
|
self.assertRaises(CommandError, update_weekly_downloads)
|
||||||
|
eq_(self.get_webapp().weekly_downloads, 0)
|
||||||
|
|
||||||
|
# Should work with the environ flag.
|
||||||
|
os.environ['FORCE_INDEXING'] = '1'
|
||||||
|
update_weekly_downloads()
|
||||||
|
finally:
|
||||||
|
unflag_database()
|
||||||
|
del os.environ['FORCE_INDEXING']
|
||||||
|
|
||||||
|
eq_(self.get_webapp().weekly_downloads, 2)
|
||||||
|
|
||||||
def test_recently(self):
|
def test_recently(self):
|
||||||
self.add_install(created=datetime.today() - timedelta(days=6))
|
self.add_install(created=datetime.today() - timedelta(days=6))
|
||||||
update_weekly_downloads()
|
update_weekly_downloads()
|
||||||
|
|
Загрузка…
Ссылка в новой задаче