bug 569742, move stats tasks to tasks.py
This will make cron.py just list the cron jobs, and tasks list all tasks regardless of whether they are used in cron. This also adds a new task to update total_contributions... I did this out of order otherwise it would have been two commits. This task can be called via cron or whenever Contributions is updated.
This commit is contained in:
Родитель
cbc7570f32
Коммит
80dd076d99
|
@ -152,6 +152,11 @@ class Addon(amo.models.ModelBase):
|
|||
nullify_invalid=True,
|
||||
blank=True, null=True,
|
||||
help_text="Requested donation amount.")
|
||||
|
||||
total_contributions = DecimalCharField(max_digits=8, decimal_places=2,
|
||||
nullify_invalid=True, blank=True,
|
||||
null=True)
|
||||
|
||||
annoying = models.PositiveIntegerField(choices=CONTRIB_CHOICES, default=0)
|
||||
enable_thankyou = models.BooleanField(default=False,
|
||||
help_text="Should the thankyou note be sent to contributors?")
|
||||
|
|
|
@ -1,22 +1,16 @@
|
|||
import datetime
|
||||
|
||||
from django.db import connection, transaction
|
||||
from django.db.models import Max, Sum
|
||||
|
||||
import commonware.log
|
||||
from celery.decorators import task
|
||||
from celery.messaging import establish_connection
|
||||
|
||||
import amo
|
||||
import cronjobs
|
||||
from addons.models import Addon
|
||||
from amo.utils import chunked
|
||||
from bandwagon.models import Collection, CollectionAddon
|
||||
from reviews.models import Review
|
||||
from versions.models import Version
|
||||
from users.models import UserProfile
|
||||
from addons.models import Addon
|
||||
from bandwagon.models import CollectionAddon
|
||||
from .models import (AddonCollectionCount, CollectionCount,
|
||||
DownloadCount, UpdateCount)
|
||||
UpdateCount)
|
||||
from . import tasks
|
||||
|
||||
task_log = commonware.log.getLogger('z.task')
|
||||
|
||||
|
@ -30,18 +24,8 @@ def update_addons_collections_downloads():
|
|||
|
||||
with establish_connection() as conn:
|
||||
for chunk in chunked(d, 600):
|
||||
_update_addons_collections_downloads.apply_async(args=[chunk],
|
||||
connection=conn)
|
||||
|
||||
|
||||
@task(rate_limit='10/m')
|
||||
def _update_addons_collections_downloads(data, **kw):
|
||||
task_log.info("[%s@%s] Updating addons+collections download totals." %
|
||||
(len(data), _update_addons_collections_downloads.rate_limit))
|
||||
for var in data:
|
||||
(CollectionAddon.objects.filter(addon=var['addon'],
|
||||
collection=var['collection'])
|
||||
.update(downloads=var['sum']))
|
||||
tasks.update_addons_collections_downloads.apply_async(
|
||||
args=[chunk], connection=conn)
|
||||
|
||||
|
||||
@cronjobs.register
|
||||
|
@ -53,17 +37,8 @@ def update_collections_total():
|
|||
|
||||
with establish_connection() as conn:
|
||||
for chunk in chunked(d, 1000):
|
||||
_update_collections_total.apply_async(args=[chunk],
|
||||
connection=conn)
|
||||
|
||||
|
||||
@task(rate_limit='15/m')
|
||||
def _update_collections_total(data, **kw):
|
||||
task_log.info("[%s@%s] Updating collections' download totals." %
|
||||
(len(data), _update_collections_total.rate_limit))
|
||||
for var in data:
|
||||
(Collection.objects.filter(pk=var['collection_id'])
|
||||
.update(downloads=var['sum']))
|
||||
tasks.update_collections_total.apply_async(args=[chunk],
|
||||
connection=conn)
|
||||
|
||||
|
||||
@cronjobs.register
|
||||
|
@ -78,98 +53,13 @@ def update_global_totals(date=None):
|
|||
|
||||
with establish_connection() as conn:
|
||||
for kw in today_jobs + metrics_jobs:
|
||||
_update_global_totals.apply_async(kwargs=kw,
|
||||
connection=conn)
|
||||
tasks.update_global_totals.apply_async(kwargs=kw, connection=conn)
|
||||
|
||||
|
||||
@task(rate_limit='20/h')
|
||||
def _update_global_totals(job, date):
|
||||
task_log.info("[%s] Updating global statistics totals (%s) for (%s)" %
|
||||
(_update_global_totals.rate_limit, job, date))
|
||||
|
||||
jobs = _get_daily_jobs()
|
||||
jobs.update(_get_metrics_jobs())
|
||||
|
||||
num = jobs[job]()
|
||||
|
||||
q = """REPLACE INTO
|
||||
global_stats(`name`, `count`, `date`)
|
||||
VALUES
|
||||
(%s, %s, %s)"""
|
||||
p = [job, num or 0, date]
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute(q, p)
|
||||
transaction.commit_unless_managed()
|
||||
|
||||
|
||||
def _get_daily_jobs(date=None):
|
||||
"""Return a dictionary of statisitics queries.
|
||||
|
||||
If a date is specified and applies to the job it will be used. Otherwise
|
||||
the date will default to today().
|
||||
"""
|
||||
|
||||
if not date:
|
||||
date = datetime.date.today()
|
||||
|
||||
extra = dict(where=['DATE(created)=%s'], params=[date])
|
||||
|
||||
# If you're editing these, note that you are returning a function! This
|
||||
# cheesy hackery was done so that we could pass the queries to celery lazily
|
||||
# and not hammer the db with a ton of these all at once.
|
||||
stats = {
|
||||
# Add-on Downloads
|
||||
'addon_total_downloads': lambda: DownloadCount.objects.filter(date__lte=date).aggregate(sum=Sum('count'))['sum'],
|
||||
'addon_downloads_new': lambda: DownloadCount.objects.filter(date=date).aggregate(sum=Sum('count'))['sum'],
|
||||
|
||||
# Add-on counts
|
||||
'addon_count_public': Addon.objects.filter(created__lte=date, status=amo.STATUS_PUBLIC, inactive=0).count,
|
||||
'addon_count_pending': Version.objects.filter(created__lte=date, files__status=amo.STATUS_PENDING).count,
|
||||
'addon_count_experimental': Addon.objects.filter(created__lte=date, status=amo.STATUS_UNREVIEWED, inactive=0).count,
|
||||
'addon_count_nominated': Addon.objects.filter(created__lte=date, status=amo.STATUS_NOMINATED, inactive=0).count,
|
||||
'addon_count_new': Addon.objects.extra(**extra).count,
|
||||
|
||||
# Version counts
|
||||
'version_count_new': Version.objects.extra(**extra).count,
|
||||
|
||||
# User counts
|
||||
'user_count_total': UserProfile.objects.filter(created__lte=date).count,
|
||||
'user_count_new': UserProfile.objects.extra(**extra).count,
|
||||
|
||||
# Review counts
|
||||
'review_count_total': Review.objects.filter(created__lte=date, editorreview=0).count,
|
||||
'review_count_new': Review.objects.filter(editorreview=0).extra(**extra).count,
|
||||
|
||||
# Collection counts
|
||||
'collection_count_total': Collection.objects.filter(created__lte=date).count,
|
||||
'collection_count_new': Collection.objects.extra(**extra).count,
|
||||
'collection_count_private': Collection.objects.filter(listed=0).count,
|
||||
'collection_count_public': Collection.objects.filter(created__lte=date, listed=1).count,
|
||||
'collection_count_autopublishers': Collection.objects.filter(created__lte=date, type=amo.COLLECTION_SYNCHRONIZED).count,
|
||||
'collection_count_editorspicks': Collection.objects.filter(created__lte=date, type=amo.COLLECTION_FEATURED).count,
|
||||
'collection_count_normal': Collection.objects.filter(created__lte=date, type=amo.COLLECTION_NORMAL).count,
|
||||
|
||||
'collection_addon_downloads': lambda: AddonCollectionCount.objects.filter(date__lte=date).aggregate(sum=Sum('count'))['sum'],
|
||||
}
|
||||
|
||||
return stats
|
||||
|
||||
|
||||
def _get_metrics_jobs(date=None):
|
||||
"""Return a dictionary of statisitics queries.
|
||||
|
||||
If a date is specified and applies to the job it will be used. Otherwise
|
||||
the date will default to the last date metrics put something in the db.
|
||||
"""
|
||||
|
||||
if not date:
|
||||
date = UpdateCount.objects.aggregate(max=Max('date'))['max']
|
||||
|
||||
# If you're editing these, note that you are returning a function!
|
||||
stats = {
|
||||
'addon_total_updatepings': lambda: UpdateCount.objects.filter(date=date).aggregate(sum=Sum('count'))['sum'],
|
||||
'collector_updatepings': lambda: UpdateCount.objects.get(addon=11950, date=date).count,
|
||||
}
|
||||
|
||||
return stats
|
||||
@cronjobs.register
|
||||
def addon_total_contributions():
|
||||
addons = Addon.objects.values_list('id', flat=True)
|
||||
with establish_connection() as conn:
|
||||
for chunk in chunked(addons, 100):
|
||||
tasks.cron_total_contributions.apply_async(args=chunk,
|
||||
connection=conn)
|
||||
|
|
|
@ -234,6 +234,14 @@ class Contribution(caching.base.CachingMixin, models.Model):
|
|||
del(self.post_data['payer_email'])
|
||||
self.save()
|
||||
|
||||
@staticmethod
|
||||
def post_save(sender, instance, **kwargs):
|
||||
from . import tasks
|
||||
tasks.addon_total_contributions.delay([instance.addon_id])
|
||||
|
||||
|
||||
models.signals.post_save.connect(Contribution.post_save, sender=Contribution)
|
||||
|
||||
|
||||
class SubscriptionEvent(ModelBase):
|
||||
"""Save subscription info for future processing."""
|
||||
|
|
|
@ -1 +1,170 @@
|
|||
from . import cron
|
||||
import datetime
|
||||
|
||||
from django.db import connection, transaction
|
||||
from django.db.models import Sum, Max
|
||||
from django.db.models.signals import post_save
|
||||
|
||||
import commonware.log
|
||||
from celery.decorators import task
|
||||
|
||||
import amo
|
||||
from addons.models import Addon
|
||||
from bandwagon.models import Collection
|
||||
from stats.models import Contribution
|
||||
from reviews.models import Review
|
||||
from users.models import UserProfile
|
||||
from versions.models import Version
|
||||
from .models import UpdateCount, DownloadCount
|
||||
|
||||
log = commonware.log.getLogger('z.task')
|
||||
|
||||
|
||||
@task
|
||||
def addon_total_contributions(*addons):
|
||||
"Updates the total contributions for a given addon."
|
||||
|
||||
log.info('[%s@%s] Updating total contributions.' %
|
||||
(len(addons), addon_total_contributions.rate_limit))
|
||||
stats = (Contribution.objects.filter(addon__in=addons).values_list('addon')
|
||||
.annotate(Sum('amount')))
|
||||
|
||||
for addon, total in stats:
|
||||
Addon.objects.filter(id=addon).update(total_contributions=total)
|
||||
|
||||
|
||||
@task(rate_limit='10/m')
|
||||
def cron_total_contributions(*addons):
|
||||
"Rate limited version of `addon_total_contributions` suitable for cron."
|
||||
addon_total_contributions(*addons)
|
||||
|
||||
|
||||
@task(rate_limit='10/m')
|
||||
def update_addons_collections_downloads(data, **kw):
|
||||
log.info("[%s@%s] Updating addons+collections download totals." %
|
||||
(len(data), update_addons_collections_downloads.rate_limit))
|
||||
for var in data:
|
||||
(CollectionAddon.objects.filter(addon=var['addon'],
|
||||
collection=var['collection'])
|
||||
.update(downloads=var['sum']))
|
||||
|
||||
|
||||
@task(rate_limit='15/m')
|
||||
def update_collections_total(data, **kw):
|
||||
log.info("[%s@%s] Updating collections' download totals." %
|
||||
(len(data), update_collections_total.rate_limit))
|
||||
for var in data:
|
||||
(Collection.objects.filter(pk=var['collection_id'])
|
||||
.update(downloads=var['sum']))
|
||||
|
||||
|
||||
@task(rate_limit='20/h')
|
||||
def update_global_totals(job, date):
|
||||
log.info("[%s] Updating global statistics totals (%s) for (%s)" %
|
||||
(update_global_totals.rate_limit, job, date))
|
||||
|
||||
jobs = _get_daily_jobs()
|
||||
jobs.update(_get_metrics_jobs())
|
||||
|
||||
num = jobs[job]()
|
||||
|
||||
q = """REPLACE INTO
|
||||
global_stats(`name`, `count`, `date`)
|
||||
VALUES
|
||||
(%s, %s, %s)"""
|
||||
p = [job, num or 0, date]
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute(q, p)
|
||||
transaction.commit_unless_managed()
|
||||
|
||||
|
||||
def _get_daily_jobs(date=None):
|
||||
"""Return a dictionary of statisitics queries.
|
||||
|
||||
If a date is specified and applies to the job it will be used. Otherwise
|
||||
the date will default to today().
|
||||
"""
|
||||
|
||||
if not date:
|
||||
date = datetime.date.today()
|
||||
|
||||
extra = dict(where=['DATE(created)=%s'], params=[date])
|
||||
|
||||
# If you're editing these, note that you are returning a function! This
|
||||
# cheesy hackery was done so that we could pass the queries to celery
|
||||
# lazily and not hammer the db with a ton of these all at once.
|
||||
stats = {
|
||||
# Add-on Downloads
|
||||
'addon_total_downloads': lambda: DownloadCount.objects.filter(
|
||||
date__lte=date).aggregate(sum=Sum('count'))['sum'],
|
||||
'addon_downloads_new': lambda: DownloadCount.objects.filter(
|
||||
date=date).aggregate(sum=Sum('count'))['sum'],
|
||||
|
||||
# Add-on counts
|
||||
'addon_count_public': Addon.objects.filter(
|
||||
created__lte=date, status=amo.STATUS_PUBLIC, inactive=0).count,
|
||||
'addon_count_pending': Version.objects.filter(
|
||||
created__lte=date, files__status=amo.STATUS_PENDING).count,
|
||||
'addon_count_experimental': Addon.objects.filter(
|
||||
created__lte=date, status=amo.STATUS_UNREVIEWED,
|
||||
inactive=0).count,
|
||||
'addon_count_nominated': Addon.objects.filter(
|
||||
created__lte=date, status=amo.STATUS_NOMINATED,
|
||||
inactive=0).count,
|
||||
'addon_count_new': Addon.objects.extra(**extra).count,
|
||||
|
||||
# Version counts
|
||||
'version_count_new': Version.objects.extra(**extra).count,
|
||||
|
||||
# User counts
|
||||
'user_count_total': UserProfile.objects.filter(
|
||||
created__lte=date).count,
|
||||
'user_count_new': UserProfile.objects.extra(**extra).count,
|
||||
|
||||
# Review counts
|
||||
'review_count_total': Review.objects.filter(created__lte=date,
|
||||
editorreview=0).count,
|
||||
'review_count_new': Review.objects.filter(editorreview=0).extra(
|
||||
**extra).count,
|
||||
|
||||
# Collection counts
|
||||
'collection_count_total': Collection.objects.filter(
|
||||
created__lte=date).count,
|
||||
'collection_count_new': Collection.objects.extra(**extra).count,
|
||||
'collection_count_private': Collection.objects.filter(listed=0).count,
|
||||
'collection_count_public': Collection.objects.filter(
|
||||
created__lte=date, listed=1).count,
|
||||
'collection_count_autopublishers': Collection.objects.filter(
|
||||
created__lte=date, type=amo.COLLECTION_SYNCHRONIZED).count,
|
||||
'collection_count_editorspicks': Collection.objects.filter(
|
||||
created__lte=date, type=amo.COLLECTION_FEATURED).count,
|
||||
'collection_count_normal': Collection.objects.filter(
|
||||
created__lte=date, type=amo.COLLECTION_NORMAL).count,
|
||||
|
||||
'collection_addon_downloads': (lambda:
|
||||
AddonCollectionCount.objects.filter(date__lte=date).aggregate(
|
||||
sum=Sum('count'))['sum']),
|
||||
}
|
||||
|
||||
return stats
|
||||
|
||||
|
||||
def _get_metrics_jobs(date=None):
|
||||
"""Return a dictionary of statisitics queries.
|
||||
|
||||
If a date is specified and applies to the job it will be used. Otherwise
|
||||
the date will default to the last date metrics put something in the db.
|
||||
"""
|
||||
|
||||
if not date:
|
||||
date = UpdateCount.objects.aggregate(max=Max('date'))['max']
|
||||
|
||||
# If you're editing these, note that you are returning a function!
|
||||
stats = {
|
||||
'addon_total_updatepings': lambda: UpdateCount.objects.filter(
|
||||
date=date).aggregate(sum=Sum('count'))['sum'],
|
||||
'collector_updatepings': lambda: UpdateCount.objects.get(
|
||||
addon=11950, date=date).count,
|
||||
}
|
||||
|
||||
return stats
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
from django import test
|
||||
import test_utils
|
||||
from nose.tools import eq_
|
||||
|
||||
from stats.cron import _update_global_totals
|
||||
from stats.models import GlobalStat
|
||||
from addons.models import Addon
|
||||
from stats import tasks
|
||||
from stats.models import GlobalStat, Contribution
|
||||
|
||||
|
||||
class TestGlobalStats(test.TestCase):
|
||||
class TestGlobalStats(test_utils.TestCase):
|
||||
|
||||
fixtures = ['stats/test_models']
|
||||
|
||||
|
@ -16,6 +17,31 @@ class TestGlobalStats(test.TestCase):
|
|||
|
||||
eq_(GlobalStat.objects.no_cache().filter(date=date,
|
||||
name=job).count(), 0)
|
||||
_update_global_totals(job, date)
|
||||
eq_(GlobalStat.objects.no_cache().filter(date=date,
|
||||
name=job).count(), 1)
|
||||
tasks.update_global_totals(job, date)
|
||||
eq_(len(GlobalStat.objects.no_cache().filter(date=date,
|
||||
name=job)), 1)
|
||||
|
||||
|
||||
class TestTotalContributions(test_utils.TestCase):
|
||||
fixtures = ['base/fixtures']
|
||||
|
||||
def test_total_contributions(self):
|
||||
|
||||
c = Contribution()
|
||||
c.addon_id = 3615
|
||||
c.amount = '9.99'
|
||||
c.save()
|
||||
|
||||
tasks.addon_total_contributions(3615)
|
||||
a = Addon.objects.no_cache().get(pk=3615)
|
||||
eq_(float(a.total_contributions), 9.99)
|
||||
|
||||
c = Contribution()
|
||||
c.addon_id = 3615
|
||||
c.amount = '10.00'
|
||||
c.save()
|
||||
|
||||
tasks.addon_total_contributions(3615)
|
||||
a = Addon.objects.no_cache().get(pk=3615)
|
||||
eq_(float(a.total_contributions), 19.99)
|
||||
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
ALTER TABLE addons
|
||||
ADD COLUMN total_contributions varchar(10) DEFAULT '0.00'
|
||||
AFTER suggested_amount;
|
||||
|
||||
-- ~48s
|
Загрузка…
Ссылка в новой задаче