switching from establish_connection to TaskSet (bug 663555)

This commit is contained in:
Jeff Balogh 2011-06-13 11:24:12 -07:00
Родитель 0bea441f3f
Коммит ba0a3a4f35
17 изменённых файлов: 94 добавлений и 349 удалений

Просмотреть файл

@ -9,12 +9,12 @@ from datetime import datetime, timedelta
from django.conf import settings
from django.db import connections, transaction
from django.db.models import Q, F, Avg, Count
from django.db.models import Q, F, Avg
import multidb
import path
import recommend
from celery.messaging import establish_connection
from celery.task.sets import TaskSet
from celeryutils import task
import amo
@ -82,10 +82,9 @@ def update_addons_current_version():
status__in=amo.VALID_STATUSES)
.exclude(type=amo.ADDON_PERSONA).values_list('id'))
with establish_connection() as conn:
for chunk in chunked(d, 100):
_update_addons_current_version.apply_async(args=[chunk],
connection=conn)
ts = [_update_addons_current_version.subtask(args=[chunk])
for chunk in chunked(d, 100)]
TaskSet(ts).apply_async()
@task(rate_limit='20/m')
@ -116,10 +115,9 @@ def update_addon_average_daily_users():
d = cursor.fetchall()
cursor.close()
with establish_connection() as conn:
for chunk in chunked(d, 1000):
_update_addon_average_daily_users.apply_async(args=[chunk],
connection=conn)
ts = [_update_addon_average_daily_users.subtask(args=[chunk])
for chunk in chunked(d, 1000)]
TaskSet(ts).apply_async()
@task(rate_limit='15/m')
@ -147,10 +145,9 @@ def update_addon_download_totals():
d = cursor.fetchall()
cursor.close()
with establish_connection() as conn:
for chunk in chunked(d, 1000):
_update_addon_download_totals.apply_async(args=[chunk],
connection=conn)
ts = [_update_addon_download_totals.subtask(args=[chunk])
for chunk in chunked(d, 1000)]
TaskSet(ts).apply_async()
@task(rate_limit='15/m')
@ -211,9 +208,9 @@ def update_addon_appsupport():
ids = (Addon.objects.valid().no_cache().distinct()
.filter(newish, good).values_list('id', flat=True))
with establish_connection() as conn:
for chunk in chunked(ids, 20):
_update_appsupport.apply_async(args=[chunk], connection=conn)
ts = [_update_appsupport.subtask(args=[chunk])
for chunk in chunked(ids, 20)]
TaskSet(ts).apply_async()
@cronjobs.register
@ -437,18 +434,6 @@ def reindex_addons():
.filter(_current_version__isnull=False,
status__in=amo.VALID_STATUSES,
disabled_by_user=False))
with establish_connection() as conn:
for chunk in chunked(sorted(list(ids)), 150):
tasks.index_addons.apply_async(args=[chunk], connection=conn)
# TODO(jbalogh): remove after 6.0.12 (bug 659948)
@cronjobs.register
def fix_dupe_appsupport():
from . import tasks
# Find all the appsupport (addon, app) rows with duplicate entries.
qs = (AppSupport.objects.values('addon', 'app')
.annotate(cnt=Count('id')).filter(cnt__gt=1))
addons = set(a['addon'] for a in qs)
# Update appsupport again to fix the dupes.
tasks.update_appsupport(addons)
ts = [tasks.index_addons.subtask(args=[chunk])
for chunk in chunked(sorted(list(ids)), 150)]
TaskSet(ts).apply_async()

Просмотреть файл

@ -8,8 +8,6 @@ from addons.tasks import fix_get_satisfaction
from amo.utils import chunked
from devhub.tasks import flag_binary
from celery.messaging import establish_connection
tasks = {
'flag_binary': {'method': flag_binary, 'qs': []},
'fix_get_satisfaction': {
@ -37,6 +35,5 @@ class Command(BaseCommand):
pks = (Addon.objects.filter(*task['qs'])
.values_list('pk', flat=True)
.order_by('id'))
with establish_connection():
for chunk in chunked(pks, 100):
task['method'].delay(chunk)
for chunk in chunked(pks, 100):
task['method'].delay(chunk)

Просмотреть файл

@ -7,9 +7,7 @@ from datetime import datetime, timedelta
from subprocess import Popen, PIPE
from django.conf import settings
from django.db.models import Count
from celery.messaging import establish_connection
from celeryutils import task
import cronjobs
import commonware.log
@ -18,7 +16,7 @@ import redisutils
import amo
from amo.utils import chunked
from addons.models import Addon, AddonCategory, BlacklistedGuid, Category
from addons.models import Addon, AddonCategory
from addons.utils import AdminActivityLogMigrationTracker, MigrationTracker
from applications.models import Application, AppVersion
from bandwagon.models import Collection
@ -34,55 +32,6 @@ from users.models import UserProfile
log = commonware.log.getLogger('z.cron')
# TODO(davedash): Delete me after this has been run.
@cronjobs.register
def remove_extra_cats():
"""
Remove 'misc' category if other categories are present.
Remove categories in excess of two categories.
"""
# Remove misc categories from addons if they are also in other categories
# for that app.
for cat in Category.objects.filter(misc=True):
# Find all the add-ons in this category.
addons_in_misc = cat.addon_set.values_list('id', flat=True)
delete_me = []
# Count the categories they have per app.
cat_count = (AddonCategory.objects.values('addon')
.annotate(num_cats=Count('category'))
.filter(num_cats__gt=1, addon__in=addons_in_misc,
category__application=cat.application_id))
delete_me = [item['addon'] for item in cat_count]
log.info('Removing %s from %d add-ons' % (cat, len(delete_me)))
(AddonCategory.objects.filter(category=cat, addon__in=delete_me)
.delete())
with establish_connection() as conn:
# Remove all but 2 categories from everything else, per app
for app in amo.APP_USAGE:
# SELECT
# `addons_categories`.`addon_id`,
# COUNT(`addons_categories`.`category_id`) AS `num_cats`
# FROM
# `addons_categories` INNER JOIN `categories` ON
# (`addons_categories`.`category_id` = `categories`.`id`)
# WHERE
# (`categories`.`application_id` = 1 )
# GROUP BY
# `addons_categories`.`addon_id`
# HAVING COUNT(`addons_categories`.`category_id`) > 2
log.info('Examining %s add-ons' % unicode(app.pretty))
results = (AddonCategory.objects
.filter(category__application=app.id)
.values('addon_id').annotate(num_cats=Count('category'))
.filter(num_cats__gt=2))
for chunk in chunked(results, 100):
_trim_categories.apply_async(args=[chunk, app.id],
connection=conn)
@task
def _trim_categories(results, app_id, **kw):
"""
@ -133,18 +82,14 @@ def gc(test_result=True):
created__lt=days_ago(4))
.values_list('id', flat=True))
with establish_connection() as conn:
for chunk in chunked(logs, 100):
_delete_logs.apply_async(args=[chunk], connection=conn)
for chunk in chunked(contributions_to_delete, 100):
_delete_stale_contributions.apply_async(
args=[chunk], connection=conn)
for chunk in chunked(collections_to_delete, 100):
_delete_anonymous_collections.apply_async(
args=[chunk], connection=conn)
for chunk in chunked(addons_to_delete, 100):
_delete_incomplete_addons.apply_async(
args=[chunk], connection=conn)
for chunk in chunked(logs, 100):
_delete_logs.delay(chunk)
for chunk in chunked(contributions_to_delete, 100):
_delete_stale_contributions.delay(args=chunk)
for chunk in chunked(collections_to_delete, 100):
_delete_anonymous_collections.delay(args=chunk)
for chunk in chunked(addons_to_delete, 100):
_delete_incomplete_addons.delay(args=chunk)
log.debug('Cleaning up sharing services.')
AddonShareCount.objects.exclude(
@ -418,113 +363,3 @@ def ping(**kw):
queue = kw['delivery_info']['routing_key']
log.info('[1@None] Checking the %s queue' % queue)
QueueCheck().set('pong', queue)
# TODO(andym): remove this once they are all gone.
@cronjobs.register
def delete_brand_thunder_addons():
ids = (102188, 102877, 103381, 103382, 103388, 107864, 109233, 109242,
111144, 111145, 115970, 150367, 146373, 143547, 142886, 140931,
113511, 100304, 130876, 126516, 124495, 123900, 120683, 159626,
159625, 157780, 157776, 155494, 155489, 155488, 152740, 152739,
151187, 193275, 184048, 182866, 179429, 179426, 161783, 161781,
161727, 160426, 160425, 220155, 219726, 219724, 219723, 219722,
218413, 200756, 200755, 199904, 221522, 221521, 221520, 221513,
221509, 221508, 221505, 220882, 220880, 220879, 223384, 223383,
223382, 223381, 223380, 223379, 223378, 223376, 222194, 221524,
223403, 223402, 223400, 223399, 223398, 223388, 223387, 223386,
223385, 232687, 232681, 228394, 228393, 228392, 228391, 228390,
226428, 226427, 226388, 235892, 235836, 235277, 235276, 235274,
232709, 232708, 232707, 232694, 232688, 94461, 94452, 54288, 50418,
49362, 49177, 239113, 102186, 102185, 101166, 101165, 101164,
99010, 99007, 99006, 98429, 98428, 45834, 179542, 103383)
guids = (
'umespersona_at_brandthunder.com', 'vanderbiltupersona_at_brandthunder.com',
'michiganstupersona_at_brandthunder.com', 'acconfpersona_at_brandthunder.com',
'uofarizonapersona_at_brandthunder.com', 'uofcincinnatipersona_at_brandthunder.com',
'texastechupersona_at_brandthunder.com', 'uofkansaspersona_at_brandthunder.com',
'uofpittsburghpersona_at_brandthunder.com', 'uofgeorgiapersona_at_brandthunder.com',
'halloween2010persona_at_brandthunder.com', 'halloweenpersona_at_brandthunder.com',
'uofscarolinapersona_at_brandthunder.com', 'auburnupersona_at_brandthunder.com',
'georgetownupersona_at_brandthunder.com', 'ncstateupersona_at_brandthunder.com',
'uofmissouripersona_at_brandthunder.com', 'uoftennesseepersona_at_brandthunder.com',
'washingtonstupersona_at_brandthunder.com',
'uofnotredamepersona_at_brandthunder.com',
'nasapersona_at_brandthunder.com', 'uofmichiganpersona_at_brandthunder.com',
'villanovaupersona_at_brandthunder.com', 'uofillinoispersona_at_brandthunder.com',
'oklahomastupersona_at_brandthunder.com', 'uofwisconsinpersona_at_brandthunder.com',
'uofwashingtonpersona_at_brandthunder.com', 'uclapersona_at_brandthunder.com',
'arizonastupersona_at_brandthunder.com', 'uofncarolinapersona_at_brandthunder.com',
'bigtenconfpersona_at_brandthunder.com', 'indianaupersona_at_brandthunder.com',
'purdueupersona_at_brandthunder.com', 'pennstupersona_at_brandthunder.com',
'uoflouisvillepersona_at_brandthunder.com', 'marquetteupersona_at_brandthunder.com',
'uofiowapersona_at_brandthunder.com', 'wakeforestunivpersona_at_brandthunder.com',
'stanfordupersona_at_brandthunder.com', 'providencecollpersona_at_brandthunder.com',
'kansasstupersona_at_brandthunder.com', 'uoftexaspersona_at_brandthunder.com',
'uofcaliforniapersona_at_brandthunder.com', 'oregonstupersona_at_brandthunder.com',
'gatechpersona_at_brandthunder.com', 'depaulupersona_at_brandthunder.com',
'uofalabamapersona_at_brandthunder.com', 'stjohnsupersona_at_brandthunder.com',
'uofmiamipersona_at_brandthunder.com', 'flastatepersona_at_brandthunder.com',
'uofconnecticutpersona_at_brandthunder.com',
'uofoklahomapersona_at_brandthunder.com',
'baylorupersona_at_brandthunder.com', 'stackpersona_at_brandthunder.com',
'askmenboom_at_askmen.com', 'uscpersona_at_brandthunder.com',
'redbullspersona_at_brandthunder.com', 'huffpostpersona_at_brandthunder.com',
'mlsunionpersona_at_brandthunder.com', 'goblinspersona2_at_brandthunder.com',
'ignboom_at_ign.com', 'fantasyrpgtheme_at_brandthunder.com',
'dragontheme_at_brandthunder.com', 'animetheme_at_brandthunder.com',
'sanjeevkapoorboom_at_sanjeevkapoor.com', 'godukeboom_at_goduke.com',
'nbakingsboom_at_nba.com', 'prowrestlingboom_at_brandthunder.com',
'plaidthemetheme_at_brandthunder.com', 'fleurdelistheme_at_brandthunder.com',
'snowthemetheme_at_brandthunder.com', 'transparenttheme_at_brandthunder.com',
'nauticaltheme_at_brandthunder.com', 'sierrasunsettheme_at_brandthunder.com',
'hotgirlbodytheme_at_brandthunder.com', 'ctrlaltdelboom_at_cad-comic.com',
'cricketboom_at_brandthunder.com', 'starrynighttheme_at_brandthunder.com',
'fantasyflowertheme_at_brandthunder.com', 'militarycamotheme_at_brandthunder.com',
'paristhemetheme_at_brandthunder.com', 'greatwalltheme_at_brandthunder.com',
'motorcycle_at_brandthunder.com', 'fullspeedboom_at_fullspeed2acure.com',
'waterfalls_at_brandthunder.com', 'mothersday2010boom_at_brandthunder.com',
'pyramids_at_brandthunder.com', 'mountain_at_brandthunder.com',
'beachsunset_at_brandthunder.com', 'newyorkcity_at_brandthunder.com',
'shinymetal_at_brandthunder.com', 'moviepremiereboom_at_brandthunder.com',
'kitttens_at_brandthunder.com', 'tulips_at_brandthunder.com',
'aquarium_at_brandthunde.com', # [sic]
'wood_at_brandthunder.com', 'puppies_at_brandthunder.com', 'ouaboom_at_oua.ca',
'wibwboom_at_wibw.com', 'nasasettingsun_at_brandthunder.com',
'bluesky_at_brandthunder.com',
'cheerleaders_at_brandthunder.com', 'greengrass_at_brandthunder.com',
'crayonpinktheme_at_brandthunder.com', 'crayonredtheme_at_brandthunder.com',
'crayonyellow_at_brandthunder.com', 'crayongreen_at_brandthunder.com',
'crayonblue_at_brandthunder.com', 'weatherboom_at_brandthunder.com',
'crayonblack_at_brandthunder.com', 'ambientglow_at_brandthunder.com',
'bubbles_at_brandthunder.com', 'matrixcode_at_brandthunder.com',
'firetheme_at_brandthunder.com', 'neonlights_at_brandthunder.com',
'brushedmetal_at_brandthunder.com', 'sugarland2_at_brandthunder.com',
'suns2_at_brandthunder.com', 'thanksgiving2_at_brandthunder.com',
'ecoboom2_at_brandthunder.com', 'thanksgivingboom_at_brandthunder.com')
guids = [guid.replace('_at_', '@') for guid in guids]
# This is a bit of an atomic bomb approach, but should ensure
# that no matter what the state of the guids or addons on AMO.
# We will end up with no addons or guids relating to Brand Thunder.
#
# Clean out any that may exist prior to deleting addons (was causing
# errors on preview).
blacklist = BlacklistedGuid.uncached.filter(guid__in=guids)
log.info('Found %s guids to delete (bug 636834)'
% blacklist.count())
blacklist.delete()
addons = Addon.uncached.filter(pk__in=ids)
log.info('Found %s addons to delete (bug 636834)' % addons.count())
for addon in addons:
try:
log.info('About to delete addon %s (bug 636834)' % addon.id)
addon.delete('Deleting per Brand Thunder request (bug 636834).')
except:
log.error('Could not delete add-on %d (bug 636834)' % addon.id,
exc_info=True)
# Then clean out any remaining blacklisted guids after being run.
blacklist = BlacklistedGuid.uncached.filter(guid__in=guids)
log.info('Found %s guids to delete (bug 636834)'
% blacklist.count())
blacklist.delete()

Просмотреть файл

@ -4,7 +4,7 @@ from django.conf import settings
from django.core.management.base import BaseCommand
import path
from celery.messaging import establish_connection
from celery.task.sets import TaskSet
import bandwagon.tasks
import users.tasks
@ -25,11 +25,12 @@ class Command(BaseCommand):
self.fix(base, task)
def fix(self, base, task):
with establish_connection() as cxn:
print 'Searching the nfs...'
files = list(path.path(base).walkfiles('*%s' % suffix))
print '%s busted files under %s.' % (len(files), base)
for src in files:
dst = src.replace(suffix, '')
log.info('Resizing %s to %s' % (src, dst))
task.apply_async(args=[src, dst], connection=cxn)
print 'Searching the nfs...'
files = list(path.path(base).walkfiles('*%s' % suffix))
print '%s busted files under %s.' % (len(files), base)
ts = []
for src in files:
dst = src.replace(suffix, '')
log.info('Resizing %s to %s' % (src, dst))
ts.append(task.subtask(args=[src, dst]))
TaskSet(ts).apply_async()

Просмотреть файл

@ -5,7 +5,7 @@ from django.db import connection, transaction
from django.db.models import Count
import commonware.log
from celery.messaging import establish_connection
from celery.task.sets import TaskSet
from celeryutils import task
import amo
@ -59,10 +59,9 @@ def update_collections_subscribers():
.annotate(count=Count('collection'))
.extra(where=['DATE(created)=%s'], params=[date.today()]))
with establish_connection() as conn:
for chunk in chunked(d, 1000):
_update_collections_subscribers.apply_async(args=[chunk],
connection=conn)
ts = [_update_collections_subscribers.subtask(args=[chunk])
for chunk in chunked(d, 1000)]
TaskSet(ts).apply_async()
@task(rate_limit='15/m')
@ -85,9 +84,9 @@ def _update_collections_subscribers(data, **kw):
def collection_meta():
from . import tasks
collections = Collection.objects.values_list('id', flat=True)
with establish_connection() as conn:
for chunk in chunked(collections, 1000):
tasks.cron_collection_meta.apply_async(args=chunk, connection=conn)
ts = [tasks.cron_collection_meta.subtask(args=chunk)
for chunk in chunked(collections, 1000)]
TaskSet(ts).apply_async()
@cronjobs.register
@ -104,14 +103,13 @@ def update_collections_votes():
.filter(vote=-1)
.extra(where=['DATE(created)=%s'], params=[date.today()]))
with establish_connection() as conn:
for chunk in chunked(up, 1000):
_update_collections_votes.apply_async(args=[chunk, "new_votes_up"],
connection=conn)
for chunk in chunked(down, 1000):
_update_collections_votes.apply_async(args=[chunk,
"new_votes_down"],
connection=conn)
ts = [_update_collections_votes.subtask(args=[chunk, 'new_votes_up'])
for chunk in chunked(up, 1000)]
TaskSet(ts).apply_async()
ts = [_update_collections_votes.subtask(args=[chunk, 'new_votes_down'])
for chunk in chunked(down, 1000)]
TaskSet(ts).apply_async()
@task(rate_limit='15/m')

Просмотреть файл

@ -1,12 +1,11 @@
from django.core.management.base import BaseCommand
from addons.models import Addon
from celery.task.sets import TaskSet
from amo.utils import chunked
from devhub.models import ActivityLog
from editors.tasks import add_versionlog
from celery.messaging import establish_connection
class Command(BaseCommand):
help = 'Add a VersionLog entry for all ActivityLog items'
@ -15,6 +14,6 @@ class Command(BaseCommand):
pks = (ActivityLog.objects.review_queue().values_list('pk', flat=True)
.order_by('id'))
with establish_connection():
for chunk in chunked(pks, 100):
add_versionlog.delay(chunk)
ts = [add_versionlog.subtask(args=[chunk])
for chunk in chunked(pks, 100)]
TaskSet(ts).apply_async()

Просмотреть файл

@ -7,11 +7,6 @@ from django.conf import settings
import cronjobs
import commonware.log
from celery.messaging import establish_connection
import amo.utils
from . import tasks
from .models import File
log = commonware.log.getLogger('z.cron')

Просмотреть файл

@ -4,7 +4,7 @@ from itertools import groupby
from django.db.models import Max
import cronjobs
from celery.messaging import establish_connection
from celery.task.sets import TaskSet
from amo.utils import chunked
from .models import Performance
@ -26,7 +26,6 @@ def update_perf():
baseline = dict((os, avg) for _, os, avg in qs.filter(addon=None))
with establish_connection() as conn:
for chunk in chunked(results, 25):
tasks.update_perf.apply_async(args=[baseline, chunk],
connection=conn)
ts = [tasks.update_perf.subtask(args=[baseline, chunk])
for chunk in chunked(results, 25)]
TaskSet(ts).apply_async()

Просмотреть файл

@ -1,6 +1,6 @@
import logging
from celery.messaging import establish_connection
from celery.task.sets import TaskSet
import cronjobs
from amo.utils import chunked
@ -16,16 +16,15 @@ log = logging.getLogger('z.cron')
def reviews_denorm():
"""Set is_latest and previous_count for all reviews."""
pairs = list(set(Review.objects.values_list('addon', 'user')))
with establish_connection() as conn:
for chunk in chunked(pairs, 50):
tasks.update_denorm.apply_async(args=chunk, connection=conn)
ts = [tasks.update_denorm.subtask(args=chunk)
for chunk in chunked(pairs, 50)]
TaskSet(ts).apply_async()
@cronjobs.register
def addon_reviews_ratings():
"""Update all add-on total_reviews and average/bayesian ratings."""
addons = Addon.objects.values_list('id', flat=True)
with establish_connection() as conn:
for chunk in chunked(addons, 100):
tasks.cron_review_aggregate.apply_async(args=chunk,
connection=conn)
ts = [tasks.cron_review_aggregate.subtask(args=chunk)
for chunk in chunked(addons, 100)]
TaskSet(ts).apply_async()

Просмотреть файл

@ -3,7 +3,7 @@ import datetime
from django.db.models import Sum, Max
import commonware.log
from celery.messaging import establish_connection
from celery.task.sets import TaskSet
import cronjobs
from amo.utils import chunked
@ -22,10 +22,9 @@ def update_addons_collections_downloads():
d = (AddonCollectionCount.objects.values('addon', 'collection')
.annotate(sum=Sum('count')))
with establish_connection() as conn:
for chunk in chunked(d, 600):
tasks.update_addons_collections_downloads.apply_async(
args=[chunk], connection=conn)
ts = [tasks.update_addons_collections_downloads.subtask(args=[chunk])
for chunk in chunked(d, 600)]
TaskSet(ts).apply_async()
@cronjobs.register
@ -35,10 +34,9 @@ def update_collections_total():
d = (CollectionCount.objects.values('collection_id')
.annotate(sum=Sum('count')))
with establish_connection() as conn:
for chunk in chunked(d, 1000):
tasks.update_collections_total.apply_async(args=[chunk],
connection=conn)
ts = [tasks.update_collections_total.subtask(args=[chunk])
for chunk in chunked(d, 1000)]
TaskSet(ts).apply_async()
@cronjobs.register
@ -53,15 +51,14 @@ def update_global_totals(date=None):
metrics_jobs = [dict(job=job, date=max_update) for job in
tasks._get_metrics_jobs(date)]
with establish_connection() as conn:
for kw in today_jobs + metrics_jobs:
tasks.update_global_totals.apply_async(kwargs=kw, connection=conn)
ts = [tasks.update_global_totals.subtask(kwargs=kw)
for kw in today_jobs + metrics_jobs]
TaskSet(ts).apply_async()
@cronjobs.register
def addon_total_contributions():
addons = Addon.objects.values_list('id', flat=True)
with establish_connection() as conn:
for chunk in chunked(addons, 100):
tasks.cron_total_contributions.apply_async(args=chunk,
connection=conn)
ts = [tasks.cron_total_contributions.subtask(args=chunk)
for chunk in chunked(addons, 100)]
TaskSet(ts).apply_async()

Просмотреть файл

@ -1,6 +1,7 @@
from django.core.management.base import BaseCommand, CommandError
from optparse import make_option
from celery.messaging import establish_connection
from django.core.management.base import BaseCommand, CommandError
import commonware.log
## FIXME: reasonable name?
@ -33,11 +34,7 @@ class Command(BaseCommand):
if options.get('simulate') and options.get('queue'):
raise CommandError('Cannot use --simulate and --queue together')
if options.get('queue'):
with establish_connection() as conn:
update_to_json.apply_async(max_objs=max_objs,
connection=conn,
classes=classes,
ids=ids)
update_to_json.delay(max_objs=max_objs, classes=classes, ids=ids)
else:
updater = _JSONUpdater(max_objs, log, self.after_exit,
classes=classes, ids=ids,

Просмотреть файл

@ -5,7 +5,6 @@ from django.db.models import Sum, Max
import commonware.log
from celery.decorators import task
from celery.messaging import establish_connection
import amo
from addons.models import Addon
@ -192,9 +191,7 @@ def update_to_json(max_objs=None, classes=(), ids=(), **kw):
def after_max_redo(msg):
log.info('Completed run: %s' % msg)
with establish_connection() as conn:
update_to_json.apply_async(max_objs=max_objs,
connection=conn)
update_to_json.delay(max_objs=max_objs)
updater = _JSONUpdater(max_objs, log, after_max_redo,
classes=classes, ids=ids)

Просмотреть файл

@ -1,30 +0,0 @@
from optparse import make_option
from django.core.management.base import BaseCommand
from amo.utils import chunked
from tags.models import Tag, TagStat
from tags.tasks import update_all_tag_stats
from celery.messaging import establish_connection
class Command(BaseCommand):
help = 'Migration to repopulate tag stats as per 635118'
option_list = BaseCommand.option_list + (
make_option('--delete', action='store_true',
dest='delete', help='Deletes all tag counts.'),
)
def handle(self, *args, **kw):
delete = kw.get('delete')
if delete:
print "Deleting all tag counts."
TagStat.objects.all().delete()
pks = list(Tag.objects.filter(blacklisted=False)
.values_list('pk', flat=True).order_by('pk'))
print "Found: %s tags, adding to celery." % len(pks)
with establish_connection() as conn:
for chunk in chunked(pks, 100):
update_all_tag_stats.delay(chunk)

Просмотреть файл

@ -1,3 +0,0 @@
from django.core.management.base import BaseCommand
from celery.messaging import establish_connection

Просмотреть файл

@ -2,7 +2,7 @@ from django.db import connections
import commonware.log
import multidb
from celery.messaging import establish_connection
from celery.task.sets import TaskSet
from celeryutils import task
import cronjobs
@ -41,10 +41,9 @@ def update_user_ratings():
d = cursor.fetchall()
cursor.close()
with establish_connection() as conn:
for chunk in chunked(d, 1000):
_update_user_ratings.apply_async(args=[chunk],
connection=conn)
ts = [_update_user_ratings.subtask(args=[chunk])
for chunk in chunked(d, 1000)]
TaskSet(ts).apply_async()
@task(rate_limit='15/m')

Просмотреть файл

@ -1,20 +0,0 @@
from django.core.management.base import BaseCommand
from celery.messaging import establish_connection
from amo.utils import chunked
from versions.models import Version
from versions.tasks import add_version_int
# TODO(andym): remove this when versions all done.
class Command(BaseCommand):
help = 'Upgrade the version model to have a version_int'
def handle(self, *args, **kw):
qs = Version.objects.filter(version_int=None)
print 'Found %s versions that need updating' % qs.count()
with establish_connection() as conn:
for pks in chunked(list(qs.values_list('pk', flat=True)), 1000):
add_version_int.delay(pks)
print '... added to celery.'

Просмотреть файл

@ -84,11 +84,11 @@ database.
If we run this command like so: ::
from celery.messaging import establish_connection
from celery.task.sets import TaskSet
with establish_connection() as conn:
_update_addon_average_daily_users.apply_async(args=[pks],
connection=conn)
ts = [_update_addon_average_daily_users.subtask(args=[pks])
for pks in amo.utils.chunked(all_pks, 300)]
TaskSet(ts).apply_async()
All the Addons with ids in ``pks`` will (eventually) have their
``current_versions`` updated.