This commit is contained in:
Ask Solem 2016-03-14 14:36:24 -07:00
Родитель 3be6bb6d25
Коммит 51fca36f19
8 изменённых файлов: 60 добавлений и 49 удалений

Просмотреть файл

@ -22,9 +22,10 @@ from .utils.log import get_logger
try:
from greenlet import GreenletExit
IGNORE_ERRORS = (GreenletExit,)
except ImportError: # pragma: no cover
IGNORE_ERRORS = ()
else:
IGNORE_ERRORS = (GreenletExit,)
__all__ = ['Blueprint', 'Step', 'StartStopStep', 'ConsumerStep']
@ -34,7 +35,6 @@ CLOSE = 0x2
TERMINATE = 0x3
logger = get_logger(__name__)
debug = logger.debug
def _pre(ns, fmt):
@ -123,7 +123,7 @@ class Blueprint(object):
self._debug('Starting %s', step.alias)
self.started = i + 1
step.start(parent)
debug('^-- substep ok')
logger.debug('^-- substep ok')
def human_state(self):
return self.state_to_name[self.state or 0]
@ -271,7 +271,7 @@ class Blueprint(object):
return step.name, step
def _debug(self, msg, *args):
return debug(_pre(self, msg), *args)
return logger.debug(_pre(self, msg), *args)
@property
def alias(self):

Просмотреть файл

@ -7,7 +7,7 @@
users, groups, and so on.
"""
from __future__ import absolute_import, print_function
from __future__ import absolute_import, print_function, unicode_literals
import atexit
import errno

Просмотреть файл

@ -6,7 +6,7 @@
Task results/state and groups of results.
"""
from __future__ import absolute_import
from __future__ import absolute_import, unicode_literals
import time
@ -873,11 +873,6 @@ class EagerResult(AsyncResult):
def _get_task_meta(self):
return self._cache
@property
def _cache(self):
return {'task_id': self.id, 'result': self._result, 'status':
self._state, 'traceback': self._traceback}
def __del__(self):
pass
@ -912,6 +907,11 @@ class EagerResult(AsyncResult):
def __repr__(self):
return '<EagerResult: {0.id}>'.format(self)
@property
def _cache(self):
return {'task_id': self.id, 'result': self._result, 'status':
self._state, 'traceback': self._traceback}
@property
def result(self):
"""The tasks return value"""

Просмотреть файл

@ -7,11 +7,12 @@
should run.
"""
from __future__ import absolute_import
from __future__ import absolute_import, unicode_literals
import numbers
import re
from bisect import bisect, bisect_left
from collections import namedtuple
from datetime import datetime, timedelta
@ -72,11 +73,11 @@ class schedule(object):
"""Schedule for periodic task.
:param run_every: Interval in seconds (or a :class:`~datetime.timedelta`).
:param relative: If set to True the run time will be rounded to the
:keyword relative: If set to True the run time will be rounded to the
resolution of the interval.
:param nowfun: Function returning the current date and time
:keyword nowfun: Function returning the current date and time
(class:`~datetime.datetime`).
:param app: Celery app instance.
:keyword app: Celery app instance.
"""
relative = False
@ -431,14 +432,13 @@ class crontab(schedule):
return result
def _delta_to_next(self, last_run_at, next_hour, next_minute):
"""
Takes a datetime of last run, next minute and hour, and
"""Takes a datetime of last run, next minute and hour, and
returns a relativedelta for the next scheduled day and time.
Only called when day_of_month and/or month_of_year cronspec
is specified to further limit scheduled task execution.
"""
from bisect import bisect, bisect_left
"""
datedata = AttributeDict(year=last_run_at.year)
days_of_month = sorted(self.day_of_month)
months_of_year = sorted(self.month_of_year)
@ -515,16 +515,20 @@ class crontab(schedule):
now = self.maybe_make_aware(self.now())
dow_num = last_run_at.isoweekday() % 7 # Sunday is day 0, not day 7
execute_this_date = (last_run_at.month in self.month_of_year and
last_run_at.day in self.day_of_month and
dow_num in self.day_of_week)
execute_this_date = (
last_run_at.month in self.month_of_year and
last_run_at.day in self.day_of_month and
dow_num in self.day_of_week
)
execute_this_hour = (execute_this_date and
last_run_at.day == now.day and
last_run_at.month == now.month and
last_run_at.year == now.year and
last_run_at.hour in self.hour and
last_run_at.minute < max(self.minute))
execute_this_hour = (
execute_this_date and
last_run_at.day == now.day and
last_run_at.month == now.month and
last_run_at.year == now.year and
last_run_at.hour in self.hour and
last_run_at.minute < max(self.minute)
)
if execute_this_hour:
next_minute = min(minute for minute in self.minute
@ -549,12 +553,14 @@ class crontab(schedule):
if day > dow_num] or self.day_of_week)
add_week = next_day == dow_num
delta = ffwd(weeks=add_week and 1 or 0,
weekday=(next_day - 1) % 7,
hour=next_hour,
minute=next_minute,
second=0,
microsecond=0)
delta = ffwd(
weeks=add_week and 1 or 0,
weekday=(next_day - 1) % 7,
hour=next_hour,
minute=next_minute,
second=0,
microsecond=0,
)
else:
delta = self._delta_to_next(last_run_at,
next_hour, next_minute)
@ -581,11 +587,13 @@ class crontab(schedule):
def __eq__(self, other):
if isinstance(other, crontab):
return (other.month_of_year == self.month_of_year and
other.day_of_month == self.day_of_month and
other.day_of_week == self.day_of_week and
other.hour == self.hour and
other.minute == self.minute)
return (
other.month_of_year == self.month_of_year and
other.day_of_month == self.day_of_month and
other.day_of_week == self.day_of_week and
other.hour == self.hour and
other.minute == self.minute
)
return NotImplemented
def __ne__(self, other):
@ -715,8 +723,8 @@ class solar(schedule):
start=last_run_at_utc, use_center=self.use_center,
)
except self.ephem.CircumpolarError: # pragma: no cover
"""Sun will not rise/set today. Check again tomorrow
(specifically, after the next anti-transit)."""
# Sun will not rise/set today. Check again tomorrow
# (specifically, after the next anti-transit).
next_utc = (
self.cal.next_antitransit(self.ephem.Sun()) +
timedelta(minutes=1)
@ -743,9 +751,11 @@ class solar(schedule):
def __eq__(self, other):
if isinstance(other, solar):
return (other.event == self.event and
other.lat == self.lat and
other.lon == self.lon)
return (
other.event == self.event and
other.lat == self.lat and
other.lon == self.lon
)
return NotImplemented
def __ne__(self, other):

Просмотреть файл

@ -12,7 +12,8 @@
See :ref:`signals` for more information.
"""
from __future__ import absolute_import
from __future__ import absolute_import, unicode_literals
from .utils.dispatch import Signal
__all__ = ['before_task_publish', 'after_task_publish',

Просмотреть файл

@ -57,7 +57,7 @@ Misc.
-----
"""
from __future__ import absolute_import
from __future__ import absolute_import, unicode_literals
__all__ = ['PENDING', 'RECEIVED', 'STARTED', 'SUCCESS', 'FAILURE',
'REVOKED', 'RETRY', 'IGNORED', 'READY_STATES', 'UNREADY_STATES',

Просмотреть файл

@ -6,7 +6,7 @@
Utility functions.
"""
from __future__ import absolute_import, print_function
from __future__ import absolute_import, print_function, unicode_literals
import numbers
import os
@ -56,7 +56,7 @@ or did you escape and the value was expanded twice? (%%N -> %N -> %hostname)?
#: We use it to find out the name of the original ``__main__``
#: module, so that we can properly rewrite the name of the
#: task to be that of ``App.main``.
MP_MAIN_FILE = os.environ.get('MP_MAIN_FILE') or None
MP_MAIN_FILE = os.environ.get('MP_MAIN_FILE')
#: Exchange for worker direct queues.
WORKER_DIRECT_EXCHANGE = Exchange('C.dq2')

Просмотреть файл

@ -6,7 +6,7 @@
Abstract classes.
"""
from __future__ import absolute_import
from __future__ import absolute_import, unicode_literals
from abc import ABCMeta, abstractmethod, abstractproperty
from collections import Callable