use celery's task dependency management to chain and parallelize reindexing. (bug 867289)

This commit is contained in:
Allen Short 2013-06-04 02:39:42 -07:00
Родитель 25ebeee1c2
Коммит 164d41e4b6
11 изменённых файлов: 86 добавлений и 70 удалений

Просмотреть файл

@ -15,7 +15,7 @@ import cronjobs
import multidb
import path
from lib import recommend
from celery.task.sets import TaskSet
from celery import group
from celeryutils import task
import waffle
@ -68,7 +68,7 @@ def update_addons_current_version():
ts = [_update_addons_current_version.subtask(args=[chunk])
for chunk in chunked(d, 100)]
TaskSet(ts).apply_async()
group(ts).apply_async()
# TODO(jbalogh): removed from cron on 6/27/11. If the site doesn't break,
@ -109,7 +109,7 @@ def update_addon_average_daily_users():
ts = [_update_addon_average_daily_users.subtask(args=[chunk])
for chunk in chunked(d, 250)]
TaskSet(ts).apply_async()
group(ts).apply_async()
@task
@ -155,7 +155,7 @@ def update_addon_download_totals():
ts = [_update_addon_download_totals.subtask(args=[chunk])
for chunk in chunked(d, 250)]
TaskSet(ts).apply_async()
group(ts).apply_async()
@task
@ -228,7 +228,7 @@ def update_addon_appsupport():
ts = [_update_appsupport.subtask(args=[chunk])
for chunk in chunked(ids, 20)]
TaskSet(ts).apply_async()
group(ts).apply_async()
@cronjobs.register
@ -445,6 +445,10 @@ def give_personas_versions():
@cronjobs.register
def reindex_addons(index=None, aliased=True, addon_type=None):
reindex_addons_task(index, aliased, addon_type)()
def reindex_addons_task(index=None, aliased=True, addon_type=None):
from . import tasks
# Make sure our mapping is up to date.
search.setup_mapping(index, aliased)
@ -454,19 +458,23 @@ def reindex_addons(index=None, aliased=True, addon_type=None):
disabled_by_user=False))
if addon_type:
ids = ids.filter(type=addon_type)
ts = [tasks.index_addons.subtask(args=[chunk], kwargs=dict(index=index))
for chunk in chunked(sorted(list(ids)), 150)]
TaskSet(ts).apply_async()
p = group([tasks.index_addons.si(chunk, index=index)
for chunk in chunked(sorted(list(ids)), 150)])
return p
@cronjobs.register
def reindex_apps(index=None, aliased=True):
reindex_apps_task(index, aliased)()
def reindex_apps_task(index=None, aliased=True):
"""Apps do get indexed by `reindex_addons`, but run this for apps only."""
from . import tasks
search.setup_mapping(index, aliased)
ids = (Addon.objects.values_list('id', flat=True)
.filter(type=amo.ADDON_WEBAPP, status__in=amo.VALID_STATUSES,
disabled_by_user=False))
ts = [tasks.index_addons.subtask(args=[chunk], kwargs=dict(index=index))
for chunk in chunked(sorted(list(ids)), 150)]
TaskSet(ts).apply_async()
return group([tasks.index_addons.si(chunk, index=index)
for chunk in chunked(sorted(list(ids)), 150)])

Просмотреть файл

@ -106,7 +106,7 @@ def delete_preview_files(id, **kw):
log.error('Error deleting preview file (%s): %s' % (f, e))
@task(acks_late=True)
@task(acks_late=True, ignore_result=False)
def index_addons(ids, **kw):
log.info('Indexing addons %s-%s. [%s]' % (ids[0], ids[-1], len(ids)))
transforms = (attach_categories, attach_devices, attach_prices,

Просмотреть файл

@ -5,7 +5,7 @@ from django.db import connection, transaction
from django.db.models import Count
import commonware.log
from celery.task.sets import TaskSet
from celery import group
from celeryutils import task
import amo
@ -185,9 +185,12 @@ def _drop_collection_recs(**kw):
@cronjobs.register
def reindex_collections(index=None, aliased=True):
reindex_collections_task(index, aliased).apply_async()
def reindex_collections_task(index=None, aliased=True):
from . import tasks
ids = (Collection.objects.exclude(type=amo.COLLECTION_SYNCHRONIZED)
.values_list('id', flat=True))
taskset = [tasks.index_collections.subtask(args=[chunk], kwargs=dict(index=index))
taskset = [tasks.index_collections.si(chunk, index=index)
for chunk in chunked(sorted(list(ids)), 150)]
TaskSet(taskset).apply_async()
return group(taskset)

Просмотреть файл

@ -102,7 +102,7 @@ def collection_watchers(*ids, **kw):
log.error('Updating collection watchers failed: %s, %s' % (pk, e))
@task
@task(ignore_result=False)
def index_collections(ids, **kw):
log.debug('Indexing collections %s-%s [%s].' % (ids[0], ids[-1], len(ids)))
index = kw.pop('index', None)

Просмотреть файл

@ -1,6 +1,8 @@
import logging
from collections import defaultdict
from celery import task
from django.db.models import Count, Max
import cronjobs
@ -21,6 +23,13 @@ log = logging.getLogger('z.compat')
@cronjobs.register
def compatibility_report(index=None, aliased=True):
return compatibility_report_task(index, aliased)()
def compatibility_report_task(index=None, aliased=True):
return _compatibility_report.si(index, aliased)
@task(ignore_result=False)
def _compatibility_report(index=None, aliased=True):
docs = defaultdict(dict)
indices = get_indices(index)

Просмотреть файл

@ -2,7 +2,7 @@ from django.db import connections
import commonware.log
import multidb
from celery.task.sets import TaskSet
from celery import group
import cronjobs
from amo import VALID_STATUSES
@ -43,13 +43,16 @@ def update_user_ratings():
ts = [update_user_ratings_task.subtask(args=[chunk])
for chunk in chunked(d, 1000)]
TaskSet(ts).apply_async()
group(ts).apply_async()
@cronjobs.register
def reindex_users(index=None, aliased=True):
return reindex_users_task(index, aliased).apply_async()
def reindex_users_task(index=None, aliased=True):
from . import tasks
ids = UserProfile.objects.values_list('id', flat=True)
taskset = [tasks.index_users.subtask(args=[chunk], kwargs=dict(index=index))
for chunk in chunked(sorted(list(ids)), 150)]
TaskSet(taskset).apply_async()
taskset = group([tasks.index_users.si(chunk, index=index)
for chunk in chunked(sorted(list(ids)), 150)])
return taskset

Просмотреть файл

@ -42,7 +42,7 @@ def resize_photo(src, dst, locally=False, **kw):
task_log.error("Error saving userpic: %s" % e)
@task
@task(ignore_result=False)
def index_users(ids, **kw):
task_log.debug('Indexing users %s-%s [%s].' % (ids[0], ids[-1], len(ids)))
index = kw.pop('index', None)

Просмотреть файл

@ -9,26 +9,26 @@ import traceback
from optparse import make_option
import requests
from celery_tasktree import task_with_callbacks, TaskTree
from celery import chain, group, task
from django.conf import settings as django_settings
from django.core.management import call_command
from django.core.management.base import BaseCommand, CommandError
from addons.cron import reindex_addons, reindex_apps
from addons.cron import reindex_addons_task
from amo.utils import timestamp_index
from apps.addons.search import setup_mapping as put_amo_mapping
from bandwagon.cron import reindex_collections
from compat.cron import compatibility_report
from bandwagon.cron import reindex_collections_task
from compat.cron import compatibility_report_task
from lib.es.models import Reindexing
from lib.es.utils import database_flagged
from stats.search import setup_indexes as put_stats_mapping
from users.cron import reindex_users
from users.cron import reindex_users_task
_INDEXES = {}
@task(ignore_result=False)
def index_stats(index=None, aliased=True):
"""Indexes the previous 365 days."""
call_command('index_stats', addons=None)
@ -40,15 +40,14 @@ if django_settings.MARKETPLACE:
# then tries to delete from the non-existant table.
#
# This really only affects tests where the table does not exist.
from mkt.stats.cron import index_mkt_stats
from mkt.stats.cron import index_mkt_stats_task
from mkt.stats.search import setup_mkt_indexes as put_mkt_stats_mapping
_INDEXES = {'stats': [index_stats, index_mkt_stats],
'apps': [reindex_addons,
reindex_apps,
reindex_collections,
reindex_users,
compatibility_report]}
_INDEXES = {'stats': [index_stats.si, index_mkt_stats_task.si],
'apps': [reindex_addons_task,
reindex_collections_task,
reindex_users_task,
compatibility_report_task]}
logger = logging.getLogger('z.elasticsearch')
DEFAULT_NUM_REPLICAS = 0
@ -93,7 +92,7 @@ def log(msg):
print msg
@task_with_callbacks
@task(ignore_result=False)
def delete_indexes(indexes):
"""Removes the indexes.
@ -105,7 +104,7 @@ def delete_indexes(indexes):
call_es(index, method='DELETE')
@task_with_callbacks
@task(ignore_result=False)
def run_aliases_actions(actions):
"""Run actions on aliases.
@ -143,7 +142,7 @@ def run_aliases_actions(actions):
call_es('_aliases', post_data, method='POST')
@task_with_callbacks
@task(ignore_result=False)
def create_mapping(new_index, alias, num_replicas=DEFAULT_NUM_REPLICAS,
num_shards=DEFAULT_NUM_SHARDS):
"""Creates a mapping for the new index.
@ -186,7 +185,6 @@ def create_mapping(new_index, alias, num_replicas=DEFAULT_NUM_REPLICAS,
status=(200, 201))
@task_with_callbacks
def create_index(index, is_stats):
"""Create the index.
@ -194,19 +192,12 @@ def create_index(index, is_stats):
- is_stats: if True, we're indexing stats
"""
log('Running all indexes for %r' % index)
indexers = is_stats and _INDEXES['stats'] or _INDEXES['apps']
indexers = _INDEXES['stats' if is_stats else 'apps']
#TODO look at task failure states? chord?
ts = [indexer(index, aliased=False) for indexer in indexers]
return group(ts)
for indexer in indexers:
log('Indexing %r' % indexer.__name__)
try:
indexer(index, aliased=False)
except Exception:
# We want to log this event but continue
log('Indexer %r failed' % indexer.__name__)
traceback.print_exc()
@task_with_callbacks
@task(ignore_result=False)
def flag_database(new_index, old_index, alias):
"""Flags the database to indicate that the reindexing has started."""
log('Flagging the database to start the reindexation')
@ -215,7 +206,7 @@ def flag_database(new_index, old_index, alias):
start_date=datetime.datetime.now())
@task_with_callbacks
@task(ignore_result=False)
def unflag_database():
"""Unflag the database to indicate that the reindexing is over."""
log('Unflagging the database')
@ -304,10 +295,9 @@ class Command(BaseCommand):
# creating a task tree
log('Building the task tree')
tree = TaskTree()
last_action = None
to_remove = []
creates = []
# for each index, we create a new time-stamped index
for alias in indexes:
@ -333,34 +323,31 @@ class Command(BaseCommand):
old_index = alias
# flag the database
step1 = tree.add_task(flag_database, args=[new_index, old_index,
alias])
step2 = step1.add_task(create_mapping, args=[new_index, alias])
step3 = step2.add_task(create_index, args=[new_index, is_stats])
last_action = step3
step1 = flag_database.si(new_index, old_index, alias)
step2 = create_mapping.si(new_index, alias)
step3 = create_index(new_index, is_stats)
creates.append(step1 | step2 | step3)
# adding new index to the alias
add_action('add', new_index, alias)
create = group(creates)
# Alias the new index and remove the old aliases, if any.
renaming_step = last_action.add_task(run_aliases_actions,
args=[actions])
rename = run_aliases_actions.si(actions)
# unflag the database - there's no need to duplicate the
# indexing anymore
delete = renaming_step.add_task(unflag_database)
delete = unflag_database.si()
# Delete the old indexes, if any
delete.add_task(delete_indexes, args=[to_remove])
del_indexes = delete_indexes.si(to_remove)
# let's do it
log('Running all indexation tasks')
os.environ['FORCE_INDEXING'] = '1'
try:
tree.apply_async()
time.sleep(10) # give celeryd some time to flag the DB
while database_flagged():
res = (create | rename | delete | del_indexes).apply_async()
while not res.ready():
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(5)

Просмотреть файл

@ -1061,7 +1061,7 @@ BROKER_PASSWORD = 'zamboni'
BROKER_VHOST = 'zamboni'
BROKER_CONNECTION_TIMEOUT = 0.1
CELERY_RESULT_BACKEND = 'amqp'
CELERY_IGNORE_RESULT = True
CELERY_IGNORE_RESULT = False
CELERY_SEND_TASK_ERROR_EMAILS = True
CELERYD_LOG_LEVEL = logging.INFO
CELERYD_HIJACK_ROOT_LOGGER = False

Просмотреть файл

@ -1,5 +1,7 @@
import datetime
from celery import task
from django.core.management import call_command
import commonware.log
@ -42,5 +44,9 @@ def index_latest_mkt_stats(index=None, aliased=True):
@cronjobs.register
def index_mkt_stats(index=None, aliased=True):
index_mkt_stats_task(index, aliased)()
@task(ignore_result=False)
def index_mkt_stats_task(index=None, aliased=True):
cron_log.info('index_mkt_stats')
call_command('index_mkt_stats', addons=None, date=None)

Просмотреть файл

@ -96,7 +96,7 @@ tastypie-services==0.2.2
-e git://github.com/mozilla/elasticutils.git@5bde211443dbe524d940974d477119f51e3ac357#egg=elasticutils
# Temporary fork.
-e git://github.com/jsocol/jingo-minify.git@cb911038e20a399aeff871e199c8bc2be12b5cee#egg=jingo-minify
-e git://github.com/mozilla/nuggets.git@96e80a64aa4bfcfef4f43fc3ab6966450ccd7325#egg=nuggets
-e git://github.com/mozilla/nuggets.git@95cbddcc871fbdb7f67a0c115767304ffc183f93#egg=nuggets
-e git://github.com/jbalogh/test-utils.git@ce5136a257cd44a1c663319124a255c1d10a9834#egg=test-utils
-e git://github.com/fwenzel/django-mozilla-product-details.git@36ef06539d6b34c4f345fd0d3e16937d0db9a752#egg=django-mozilla-product-details
-e git://github.com/mozilla/signing-clients@a8bd730b202391c080113d224d223463e03088e9#egg=signing-clients