send celery stats to graphite (bug 662665)
This commit is contained in:
Родитель
5dd91a5e19
Коммит
c7a6403a2b
|
@ -0,0 +1,37 @@
|
|||
import logging
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
import redisutils
|
||||
from graphite import graphite
|
||||
|
||||
from amo.tasks import task_stats
|
||||
|
||||
log = logging.getLogger('z.redis')
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Subscribe to celery events and publish to graphite."
|
||||
|
||||
def handle(self, *args, **kw):
|
||||
redis = redisutils.connections['master']
|
||||
while 1:
|
||||
stats = []
|
||||
d = zip(['pending', 'failed', 'total'], task_stats.stats())
|
||||
for key, dict_ in d:
|
||||
for name, value in dict_.items():
|
||||
stats.append(('celery.tasks.%s.%s' % (key, name), value))
|
||||
graphite.sendall(*stats)
|
||||
|
||||
# We don't care about the message, just block until the next one.
|
||||
redis.subscribe('celery.tasks.stats')
|
||||
listener = redis.listen()
|
||||
while 1:
|
||||
if listener.next()['type'] == 'message':
|
||||
break
|
||||
|
||||
# Unsubscribe so we can process the stats.
|
||||
redis.unsubscribe()
|
||||
while 1:
|
||||
if listener.next()['type'] == 'unsubscribe':
|
||||
break
|
|
@ -72,6 +72,7 @@ class TaskStats(object):
|
|||
# id in here.
|
||||
self.redis.hincrby(self.pending, sender, 1)
|
||||
self.redis.hset(self.timer, kw['id'], time.time())
|
||||
self.redis.publish('celery.tasks.stats', 'sent')
|
||||
|
||||
def on_postrun(self, sender, **kw):
|
||||
# sender is the task object. task_id in here.
|
||||
|
@ -83,10 +84,12 @@ class TaskStats(object):
|
|||
if start:
|
||||
t = (time.time() - float(start)) * 1000
|
||||
statsd.timing('tasks.%s' % sender.name, int(t))
|
||||
self.redis.publish('celery.tasks.stats', 'postrun')
|
||||
|
||||
def on_failure(self, sender, **kw):
|
||||
# sender is the task object.
|
||||
self.redis.hincrby(self.failed, sender.name, 1)
|
||||
self.redis.publish('celery.tasks.stats', 'failure')
|
||||
|
||||
def stats(self):
|
||||
get = self.redis.hgetall
|
||||
|
|
Загрузка…
Ссылка в новой задаче