!14
This commit is contained in:
Kong Luoxing 2016-04-03 20:04:42 +08:00
Родитель d16cdc9c19 cb8ffcabf6
Коммит 21736dd24a
5 изменённых файлов: 344 добавлений и 207 удалений

Просмотреть файл

@ -1,7 +1,12 @@
from __future__ import absolute_import
from .schedulers import PeriodicTask
from .task import PeriodicTask, Crontab, Interval
from .schedulers import RedisScheduler, RedisScheduleEntry
__all__ = [
'PeriodicTask'
'PeriodicTask',
'Crontab',
'Interval'
'RedisScheduler',
'RedisScheduleEntry'
]

Просмотреть файл

@ -10,8 +10,6 @@ from redis.client import StrictRedis
from celery import current_app
from celery.utils.log import get_logger
rdb = StrictRedis.from_url(current_app.conf.CELERY_REDIS_SCHEDULER_URL)
ADD_ENTRY_ERROR = """\
Couldn't add entry %r to redis schedule: %r. Contents: %r
@ -33,4 +31,3 @@ def str_to_bytes(s):
if isinstance(s, str):
return s.encode(default_encoding)
return s

Просмотреть файл

@ -5,66 +5,125 @@
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
import datetime
import logging
from functools import partial
# we don't need simplejson, builtin json module is good enough
import json
from copy import deepcopy
from celery.beat import Scheduler, ScheduleEntry
from redis import StrictRedis
from celery import current_app
import kombu.utils
import celery.schedules
from redis.exceptions import LockError
from .task import PeriodicTask
from .globals import rdb, logger, ADD_ENTRY_ERROR
from .exceptions import ValidationError
from .decoder import DateTimeDecoder, DateTimeEncoder
from .globals import logger
class RedisScheduleEntry(ScheduleEntry):
scheduler = None
class RedisScheduleEntry(object):
"""
The Schedule Entry class is mainly here to handle the celery dependency injection
and delegates everything to a PeriodicTask instance
It follows the Adapter Design pattern, trivially implemented in python with __getattr__ and __setattr__
This is similar to https://github.com/celery/django-celery/blob/master/djcelery/schedulers.py
that uses SQLAlchemy DBModels as delegates.
"""
def __init__(self, task):
self._task = task
def __init__(self, name=None, task=None, enabled=True, last_run_at=None,
total_run_count=None, schedule=None, args=(), kwargs=None,
options=None, app=None, **extrakwargs):
self.app = current_app._get_current_object()
self.name = self._task.key # passing key here as the task name is a human use only field.
self.task = self._task.task
# defaults (MUST NOT call self here - or loop __getattr__ for ever)
app = app or current_app
# Setting a default time a bit before now to not miss a task that was just added.
last_run_at = last_run_at or app.now() - datetime.timedelta(seconds=app.conf.CELERYBEAT_MAX_LOOP_INTERVAL)
self.schedule = self._task.schedule
# using periodic task as delegate
object.__setattr__(self, '_task', PeriodicTask(
# Note : for compatibiilty with celery methods, the name of the task is actually the key in redis DB.
# For extra fancy fields (like a human readable name, you can leverage extrakwargs)
name=name,
task=task,
enabled=enabled,
schedule=schedule, # TODO : sensible default here ?
args=args,
kwargs=kwargs or {},
options=options or {},
last_run_at=last_run_at,
total_run_count=total_run_count or 0,
**extrakwargs
))
self.args = self._task.args
self.kwargs = self._task.kwargs
self.options = {
'queue': self._task.queue,
'exchange': self._task.exchange,
'routing_key': self._task.routing_key,
'expires': self._task.expires
#
# Initializing members here and not in delegate
#
# The app is kept here (PeriodicTask should not need it)
object.__setattr__(self, 'app', app)
# automatic delegation to PeriodicTask (easy delegate)
def __getattr__(self, attr):
return getattr(self._task, attr)
def __setattr__(self, attr, value):
# We set the attribute in the task delegate if available
if hasattr(self, '_task') and hasattr(self._task, attr):
setattr(self._task, attr, value)
return
# else we raise
raise AttributeError("Attribute {attr} not found in {tasktype}".format(attr=attr, tasktype=type(self._task)))
#
# Overrides schedule accessors in PeriodicTask to store dict in json but retrieve proper celery schedules
#
def get_schedule(self):
if {'every', 'period'}.issubset(self._task.schedule.keys()):
return celery.schedules.schedule(datetime.timedelta(**{self._task.schedule['period']: self._task.schedule['every']}), self.app)
elif {'minute', 'hour', 'day_of_week', 'day_of_month', 'month_of_year'}.issubset(self._task.schedule.keys()):
return celery.schedules.crontab(minute=self._task.schedule['minute'],
hour=self._task.schedule['hour'],
day_of_week=self._task.schedule['day_of_week'],
day_of_month=self._task.schedule['day_of_month'],
month_of_year=self._task.schedule['month_of_year'],
app=self.app)
else:
raise Exception('Existing Task schedule type not recognized')
def set_schedule(self, schedule):
if isinstance(schedule, celery.schedules.schedule):
# TODO : unify this with Interval in PeriodicTask
self._task.schedule = {
'every': max(schedule.run_every.total_seconds(), 0),
'period': 'seconds'
}
if not self._task.total_run_count:
self._task.total_run_count = 0
self.total_run_count = self._task.total_run_count
elif isinstance(schedule, celery.schedules.crontab):
# TODO : unify this with Crontab in PeriodicTask
self._task.schedule = {
'minute': schedule._orig_minute,
'hour': schedule._orig_hour,
'day_of_week': schedule._orig_day_of_week,
'day_of_month': schedule._orig_day_of_month,
'month_of_year': schedule._orig_month_of_year
}
else:
raise Exception('New Task schedule type not recognized')
if not self._task.last_run_at:
# subtract some time from the current time to populate the last time
# that the task was run so that a newly scheduled task does not get missed
time_subtract = (self.app.conf.CELERYBEAT_MAX_LOOP_INTERVAL or 30)
self._task.last_run_at = self._default_now() - datetime.timedelta(seconds=time_subtract)
self.save()
self.last_run_at = self._task.last_run_at
def _default_now(self):
return self.app.now()
def next(self):
self._task.last_run_at = self.app.now()
self._task.total_run_count += 1
return self.__class__(self._task)
__next__ = next
schedule = property(get_schedule, set_schedule)
#
# Overloading ScheduleEntry methods
#
def is_due(self):
"""See :meth:`~celery.schedule.schedule.is_due`."""
due = self.schedule.is_due(self.last_run_at)
if not self.scheduler._lock_acquired:
return celery.schedules.schedstate(is_due=False, next=due[1])
if not self._task.enabled:
logger.info('task %s disabled', self.name)
logger.debug('task {0} due : {1}'.format(self.name, due))
if not self.enabled:
logger.info('task {0} is disabled. not triggered.'.format(self.name))
# if the task is disabled, we always return false, but the time that
# it is next due is returned as usual
return celery.schedules.schedstate(is_due=False, next=due[1])
@ -72,28 +131,58 @@ class RedisScheduleEntry(ScheduleEntry):
return due
def __repr__(self):
return '<RedisScheduleEntry ({0} {1}(*{2}, **{3}) {{4}})>'.format(
self.name, self.task, self.args,
self.kwargs, self.schedule,
return '<RedisScheduleEntry: {0.name} {call} {0.schedule}'.format(
self,
call=kombu.utils.reprcall(self.task, self.args or (), self.kwargs or {}),
)
def reserve(self, entry):
new_entry = Scheduler.reserve(entry)
return new_entry
def update(self, other):
"""
Update values from another entry.
This is used to dynamically update periodic entry from edited redis values
Does not update "non-editable" fields
Extra arguments will be updated (considered editable)
"""
# Handle delegation properly here
self._task.update(other._task)
# we should never need to touch the app here
def save(self):
if self.total_run_count > self._task.total_run_count:
self._task.total_run_count = self.total_run_count
if self.last_run_at and self._task.last_run_at and self.last_run_at > self._task.last_run_at:
self._task.last_run_at = self.last_run_at
self._task.save()
#
# ScheduleEntry needs to be an iterable
#
# from celery.beat.ScheduleEntry._default_now
def _default_now(self):
return self.get_schedule().now() if self.schedule else self.app.now()
# from celery.beat.ScheduleEntry._next_instance
def _next_instance(self, last_run_at=None):
"""Return a new instance of the same class, but with
its date and count fields updated."""
return self.__class__(**dict(
self,
last_run_at=last_run_at or self._default_now(),
total_run_count=self.total_run_count + 1,
))
__next__ = next = _next_instance # for 2to3
def __iter__(self):
# We need to delegate iter (iterate on task members, not on multiple tasks)
# Following celery.SchedulerEntry.__iter__() design
return iter(self._task)
@staticmethod
def get_all_as_dict(scheduler_url, key_prefix):
"""get all of the tasks, for best performance with large amount of tasks, return a generator
"""
# Calling another generator
for task_key, task_dict in PeriodicTask.get_all_as_dict(scheduler_url, key_prefix):
yield task_key, task_dict
@classmethod
def from_entry(cls, name, skip_fields=('relative', 'options'), **entry):
def from_entry(cls, scheduler_url, name, **entry):
options = entry.get('options') or {}
fields = dict(entry)
for skip_field in skip_fields:
fields.pop(skip_field, None)
fields['name'] = current_app.conf.CELERY_REDIS_SCHEDULER_KEY_PREFIX + name
schedule = fields.pop('schedule')
schedule = celery.schedules.maybe_schedule(schedule)
@ -110,29 +199,21 @@ class RedisScheduleEntry(ScheduleEntry):
fields['args'] = fields.get('args', [])
fields['kwargs'] = fields.get('kwargs', {})
fields['queue'] = options.get('queue')
fields['exchange'] = options.get('exchange')
fields['routing_key'] = options.get('routing_key')
fields['key'] = fields['name']
return cls(PeriodicTask.from_dict(fields))
return cls(PeriodicTask.from_dict(fields, scheduler_url))
class RedisScheduler(Scheduler):
# how often should we sync in schedule information
# from the backend redis database
UPDATE_INTERVAL = datetime.timedelta(seconds=5)
Entry = RedisScheduleEntry
def __init__(self, *args, **kwargs):
if hasattr(current_app.conf, 'CELERY_REDIS_SCHEDULER_URL'):
logger.info('backend scheduler using %s',
current_app.conf.CELERY_REDIS_SCHEDULER_URL)
else:
logger.info('backend scheduler using %s',
current_app.conf.CELERY_REDIS_SCHEDULER_URL)
self._schedule = {}
self._dirty = set() # keeping modified entries by name for sync later on
self._schedule = {} # keeping dynamic schedule from redis DB here
# self.data is used for statically configured schedule
self.schedule_url = current_app.conf.CELERY_REDIS_SCHEDULER_URL
self.rdb = StrictRedis.from_url(self.schedule_url)
self._last_updated = None
Scheduler.__init__(self, *args, **kwargs)
self.max_interval = (kwargs.get('max_interval') \
@ -141,16 +222,38 @@ class RedisScheduler(Scheduler):
self._lock_acquired = self._lock.acquire(blocking=False)
self.Entry.scheduler = self
# This will launch setup_schedule if not lazy
super(RedisScheduler, self).__init__(*args, **kwargs)
def setup_schedule(self):
self.install_default_entries(self.schedule)
super(RedisScheduler, self).setup_schedule()
# In case we have a preconfigured schedule
self.update_from_dict(self.app.conf.CELERYBEAT_SCHEDULE)
def requires_update(self):
"""check whether we should pull an updated schedule
from the backend database"""
if not self._last_updated:
return True
return self._last_updated + self.UPDATE_INTERVAL < datetime.datetime.now()
def tick(self):
"""Run a tick, that is one iteration of the scheduler.
Executes all due tasks.
"""
# need to grab all data (might have been updated) from schedule DB.
# we need to merge it with whatever schedule was set in config, and already installed default tasks
try:
s = self.all_as_schedule()
self.merge_inplace(s)
except Exception as exc:
logger.error("Exception when getting tasks from {url} : {exc}".format(url=self.schedule_url, exc=exc))
# TODO : atomic merge : be able to cancel it if there s a problem
raise
# displaying the schedule we got from redis
logger.debug("DB schedule : {0}".format(self.schedule))
# this will call self.maybe_due() to check if any entry is due.
return super(RedisScheduler, self).tick()
def all_as_schedule(self, key_prefix=None, entry_class=None):
logger.debug('RedisScheduler: Fetching database schedule')
key_prefix = key_prefix or current_app.conf.CELERY_REDIS_SCHEDULER_KEY_PREFIX
entry_class = entry_class or self.Entry
def tick(self):
if not self._lock_acquired:
@ -164,30 +267,31 @@ class RedisScheduler(Scheduler):
def get_from_database(self):
self.sync()
d = {}
for task in PeriodicTask.get_all(current_app.conf.CELERY_REDIS_SCHEDULER_KEY_PREFIX):
t = PeriodicTask.from_dict(task)
d[t.key] = RedisScheduleEntry(t)
for key, task in entry_class.get_all_as_dict(self.rdb, key_prefix):
# logger.debug('Building {0} from : {1}'.format(entry_class, task))
d[key] = entry_class(**dict(task, app=self.app))
return d
def update_from_dict(self, dict_):
s = {}
for name, entry in dict_.items():
try:
s[name] = self.Entry.from_entry(name, **entry)
except Exception as exc:
error(ADD_ENTRY_ERROR, name, exc, entry)
self.schedule.update(s)
@property
def schedule(self):
if self.requires_update():
self._schedule = self.get_from_database()
self._last_updated = datetime.datetime.now()
return self._schedule
def reserve(self, entry):
# called when the task is about to be run (and data will be modified -> sync() will need to save it)
new_entry = super(RedisScheduler, self).reserve(entry)
# Need to store the key of the entry, because the entry may change in the mean time.
self._dirty.add(new_entry.name)
return new_entry
def sync(self):
for entry in self._schedule.values():
entry.save()
logger.info('Writing modified entries...')
_tried = set()
try:
while self._dirty:
name = self._dirty.pop()
_tried.add(name)
# Saving the entry back into Redis DB.
self.rdb.set(name, self.schedule[name].jsondump())
except Exception as exc:
# retry later
self._dirty |= _tried
logger.error('Error while sync: %r', exc, exc_info=1)
def close(self):
try:
@ -202,3 +306,7 @@ class RedisScheduler(Scheduler):
self._lock.release()
except LockError:
pass
@property
def info(self):
return ' . db -> {self.schedule_url}'.format(self=self)

Просмотреть файл

@ -6,70 +6,18 @@
# of the License at http://www.apache.org/licenses/LICENSE-2.0
import datetime
from copy import deepcopy
from redis import StrictRedis
try:
import simplejson as json
except ImportError:
import json
import celery.schedules
from .decoder import DateTimeDecoder, DateTimeEncoder
from .exceptions import ValidationError
from .globals import rdb, bytes_to_str, default_encoding, logger
class PeriodicTask(object):
'''represents a periodic task
'''
name = None
task = None
type_ = None
interval = None
crontab = None
args = []
kwargs = {}
queue = None
exchange = None
routing_key = None
# datetime
expires = None
enabled = True
# datetime
last_run_at = None
total_run_count = 0
date_changed = None
description = None
no_changes = False
def __init__(self, name, task, schedule, key, queue='celery', enabled=True, task_args=[], task_kwargs={}, **kwargs):
self.task = task
self.enabled = enabled
if isinstance(schedule, self.Interval):
self.interval = schedule
if isinstance(schedule, self.Crontab):
self.crontab = schedule
self.queue = queue
self.args = task_args
self.kwargs = task_kwargs
self.name = name
self.key = key
self.delete_key = 'deleted:' + bytes_to_str(self.key)
self.running = False
class Interval(object):
def __init__(self, every, period='seconds'):
@ -77,10 +25,6 @@ class PeriodicTask(object):
# could be seconds minutes hours
self.period = period
@property
def schedule(self):
return celery.schedules.schedule(datetime.timedelta(**{self.period: self.every}))
@property
def period_singular(self):
return self.period[:-1]
@ -90,22 +34,15 @@ class PeriodicTask(object):
return 'every {0.period_singular}'.format(self)
return 'every {0.every} {0.period}'.format(self)
class Crontab(object):
def __init__(self, minute, hour, day_of_week, day_of_month, month_of_year):
def __init__(self, minute=0, hour=0, day_of_week=None, day_of_month=None, month_of_year=None):
self.minute = minute
self.hour = hour
self.day_of_week = day_of_week
self.day_of_month = day_of_month
self.month_of_year = month_of_year
@property
def schedule(self):
return celery.schedules.crontab(minute=self.minute,
hour=self.hour,
day_of_week=self.day_of_week,
day_of_month=self.day_of_month,
month_of_year=self.month_of_year)
self.day_of_week = day_of_week or '*'
self.day_of_month = day_of_month or '*'
self.month_of_year = month_of_year or '*'
def __unicode__(self):
rfield = lambda f: f and str(f).replace(' ', '') or '*'
@ -114,10 +51,78 @@ class PeriodicTask(object):
rfield(self.day_of_month), rfield(self.month_of_year),
)
class PeriodicTask(object):
"""
Represents a periodic task.
This follows the celery.beat.ScheduleEntry class design.
However it is independent of any celery import, so that any client library can import this module
and use it to manipulate periodic tasks into a Redis database, without worrying about all the celery imports.
Should follow the SQLAlchemy DBModel design.
These are used as delegate from https://github.com/celery/django-celery/blob/master/djcelery/schedulers.py
"""
name = None
task = None
data = None
args = []
kwargs = {}
options = {}
enabled = True
# datetime
last_run_at = None
total_run_count = 0
# Follow celery.beat.SchedulerEntry:__init__() signature as much as possible
def __init__(self, name, task, schedule, enabled=True, args=(), kwargs=None, options=None,
last_run_at=None, total_run_count=None, **extrakwargs):
"""
:param name: name of the task ( = redis key )
:param task: taskname ( as in celery : python function name )
:param schedule: the schedule. maybe also a dict with all schedule content
:param relative: if the schedule time needs to be relative to the interval ( see celery.schedules )
:param enabled: whether this task is enabled or not
:param args: args for the task
:param kwargs: kwargs for the task
:param options: options for hte task
:param last_run_at: lat time the task was run
:param total_run_count: total number of times the task was run
:return:
"""
self.task = task
self.enabled = enabled
# Using schedule property conversion
# logger.warn("Schedule in Task init {s}".format(s=schedule))
self.schedule = schedule
self.args = args
self.kwargs = kwargs or {}
self.options = options or {}
self.last_run_at = last_run_at
self.total_run_count = total_run_count
self.name = name
self.key = key
self.delete_key = 'deleted:' + bytes_to_str(self.key)
self.running = False
# storing extra arguments (might be useful to have other args depending on application)
for elem in extrakwargs.keys():
setattr(self, elem, extrakwargs[elem])
@staticmethod
def get_all(key_prefix):
def get_all_as_dict(rdb, key_prefix):
"""get all of the tasks, for best performance with large amount of tasks, return a generator
"""
tasks = rdb.keys(key_prefix + '*')
for task_key in tasks:
try:
@ -172,41 +177,64 @@ class PeriodicTask(object):
@staticmethod
def from_dict(d):
"""
build PeriodicTask instance from dict
:param d: dict
:return: PeriodicTask instance
Update values from another task.
This is used to dynamically update periodic task from edited redis values
Does not update "non-editable" fields (last_run_at, total_run_count).
Extra arguments will be updated (considered editable)
"""
if d.get('interval'):
schedule = PeriodicTask.Interval(d['interval']['every'], d['interval']['period'])
if d.get('crontab'):
schedule = PeriodicTask.Crontab(
d['crontab']['minute'],
d['crontab']['hour'],
d['crontab']['day_of_week'],
d['crontab']['day_of_month'],
d['crontab']['month_of_year']
)
task = PeriodicTask(d['name'], d['task'], schedule, d['key'])
for elem in d:
if elem not in ('interval', 'crontab', 'schedule'):
setattr(task, elem, d[elem])
return task
otherdict = other.__dict__ # note : schedule property is not part of the dict.
otherdict.pop('last_run_at')
otherdict.pop('total_run_count')
self.__dict__.update(otherdict)
@property
def schedule(self):
if self.interval:
return self.interval.schedule
elif self.crontab:
return self.crontab.schedule
else:
raise Exception('must define interval or crontab schedule')
def __repr__(self):
return '<PeriodicTask ({0} {1}(*{2}, **{3}) options: {4} schedule: {5})>'.format(
self.name, self.task, self.args,
self.kwargs, self.options, self.schedule,
)
def __unicode__(self):
fmt = '{0.name}: {{no schedule}}'
if self.interval:
fmt = '{0.name}: {0.interval}'
elif self.crontab:
fmt = '{0.name}: {0.crontab}'
else:
raise Exception('must define internal or crontab schedule')
fmt = '{0.name}: {0.schedule}'
return fmt.format(self)
def get_schedule(self):
"""
schedule Interval / Crontab -> dict
:return:
"""
return vars(self.data)
def set_schedule(self, schedule):
"""
schedule dict -> Interval / Crontab if needed
:return:
"""
if isinstance(schedule, Interval) or isinstance(schedule, Crontab):
self.data = schedule
else:
schedule_inst = None
for s in [Interval, Crontab]:
try:
schedule_inst = s(**schedule)
except TypeError as typexc:
pass
if schedule_inst is None:
raise Exception(logger.warn("Schedule {s} didn't match Crontab or Interval type".format(s=schedule)))
else:
self.data = schedule_inst
schedule = property(get_schedule, set_schedule)
def __iter__(self):
"""
We iterate on our members a little bit specially
=> data is hidden and schedule is shown instead
=> rdb is hidden
:return:
"""
for k, v in vars(self).iteritems():
if k == 'data':
yield 'schedule', self.schedule
else: # we can expose everything else
yield k, v

Просмотреть файл

@ -3,7 +3,7 @@ from setuptools import setup
setup(
name = "celerybeat-redis",
description = "A Celery Beat Scheduler that uses Redis to store both schedule definitions and status information",
version = "0.1.3",
version = "0.1.5",
license = "Apache License, Version 2.0",
author = "Kong Luoxing",
author_email = "kong.luoxing@gmail.com",
@ -19,9 +19,8 @@ setup(
install_requires=[
'setuptools',
'redis',
'celery',
'simplejson',
'redis>=2.10.3',
'celery>=3.1.16'
]
)