[AIRFLOW-1806] Use naive datetime for cron scheduling
Converting to naive time is required in order to make sure to run at exact times for crons. E.g. if you specify to run at 8:00pm every day you do not want suddenly to run at 7:00pm due to DST.
This commit is contained in:
Родитель
2f168634aa
Коммит
dcac3e97a4
|
@ -46,6 +46,8 @@ import textwrap
|
|||
import traceback
|
||||
import warnings
|
||||
import hashlib
|
||||
|
||||
from datetime import datetime
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from sqlalchemy import (
|
||||
|
@ -2996,16 +2998,30 @@ class DAG(BaseDag, LoggingMixin):
|
|||
num=num, delta=self._schedule_interval)
|
||||
|
||||
def following_schedule(self, dttm):
|
||||
"""
|
||||
Calculates the following schedule for this dag in local time
|
||||
:param dttm: utc datetime
|
||||
:return: utc datetime
|
||||
"""
|
||||
if isinstance(self._schedule_interval, six.string_types):
|
||||
dttm = timezone.make_naive(dttm, self.timezone)
|
||||
cron = croniter(self._schedule_interval, dttm)
|
||||
return cron.get_next(datetime)
|
||||
following = timezone.make_aware(cron.get_next(datetime), self.timezone)
|
||||
return timezone.convert_to_utc(following)
|
||||
elif isinstance(self._schedule_interval, timedelta):
|
||||
return dttm + self._schedule_interval
|
||||
|
||||
def previous_schedule(self, dttm):
|
||||
"""
|
||||
Calculates the previous schedule for this dag in local time
|
||||
:param dttm: utc datetime
|
||||
:return: utc datetime
|
||||
"""
|
||||
if isinstance(self._schedule_interval, six.string_types):
|
||||
dttm = timezone.make_naive(dttm, self.timezone)
|
||||
cron = croniter(self._schedule_interval, dttm)
|
||||
return cron.get_prev(datetime)
|
||||
prev = timezone.make_aware(cron.get_prev(datetime), self.timezone)
|
||||
return timezone.convert_to_utc(prev)
|
||||
elif isinstance(self._schedule_interval, timedelta):
|
||||
return dttm - self._schedule_interval
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import datetime as dt
|
||||
import pendulum
|
||||
|
||||
from airflow.settings import TIMEZONE
|
||||
|
@ -66,3 +67,72 @@ def convert_to_utc(value):
|
|||
value = pendulum.instance(value, TIMEZONE)
|
||||
|
||||
return value.astimezone(utc)
|
||||
|
||||
|
||||
def make_aware(value, timezone=None):
|
||||
"""
|
||||
Make a naive datetime.datetime in a given time zone aware.
|
||||
|
||||
:param value: datetime
|
||||
:param timezone: timezone
|
||||
:return: localized datetime in settings.TIMEZONE or timezone
|
||||
|
||||
"""
|
||||
if timezone is None:
|
||||
timezone = TIMEZONE
|
||||
|
||||
# Check that we won't overwrite the timezone of an aware datetime.
|
||||
if is_localized(value):
|
||||
raise ValueError(
|
||||
"make_aware expects a naive datetime, got %s" % value)
|
||||
|
||||
if hasattr(timezone, 'localize'):
|
||||
# This method is available for pytz time zones.
|
||||
return timezone.localize(value)
|
||||
elif hasattr(timezone, 'convert'):
|
||||
# For pendulum
|
||||
return timezone.convert(value)
|
||||
else:
|
||||
# This may be wrong around DST changes!
|
||||
return value.replace(tzinfo=timezone)
|
||||
|
||||
|
||||
def make_naive(value, timezone=None):
|
||||
"""
|
||||
Make an aware datetime.datetime naive in a given time zone.
|
||||
|
||||
:param value: datetime
|
||||
:param timezone: timezone
|
||||
:return: naive datetime
|
||||
"""
|
||||
if timezone is None:
|
||||
timezone = TIMEZONE
|
||||
|
||||
# Emulate the behavior of astimezone() on Python < 3.6.
|
||||
if is_naive(value):
|
||||
raise ValueError("make_naive() cannot be applied to a naive datetime")
|
||||
|
||||
o = value.astimezone(timezone)
|
||||
|
||||
# cross library compatibility
|
||||
naive = dt.datetime(o.year,
|
||||
o.month,
|
||||
o.day,
|
||||
o.hour,
|
||||
o.minute,
|
||||
o.second,
|
||||
o.microsecond)
|
||||
|
||||
return naive
|
||||
|
||||
|
||||
def datetime(*args, **kwargs):
|
||||
"""
|
||||
Wrapper around datetime.datetime that adds settings.TIMEZONE if tzinfo not specified
|
||||
|
||||
:return: datetime.datetime
|
||||
"""
|
||||
if 'tzinfo' not in kwargs:
|
||||
kwargs['tzinfo'] = TIMEZONE
|
||||
|
||||
return dt.datetime(*args, **kwargs)
|
||||
|
|
|
@ -46,3 +46,22 @@ class TimezoneTest(unittest.TestCase):
|
|||
eat = datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT)
|
||||
utc = datetime.datetime(2011, 9, 1, 10, 20, 30, tzinfo=UTC)
|
||||
self.assertEquals(utc, timezone.convert_to_utc(eat))
|
||||
|
||||
def test_make_naive(self):
|
||||
self.assertEqual(
|
||||
timezone.make_naive(datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT), EAT),
|
||||
datetime.datetime(2011, 9, 1, 13, 20, 30))
|
||||
self.assertEqual(
|
||||
timezone.make_naive(datetime.datetime(2011, 9, 1, 17, 20, 30, tzinfo=ICT), EAT),
|
||||
datetime.datetime(2011, 9, 1, 13, 20, 30))
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
timezone.make_naive(datetime.datetime(2011, 9, 1, 13, 20, 30), EAT)
|
||||
|
||||
def test_make_aware(self):
|
||||
self.assertEqual(
|
||||
timezone.make_aware(datetime.datetime(2011, 9, 1, 13, 20, 30), EAT),
|
||||
datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT))
|
||||
with self.assertRaises(ValueError):
|
||||
timezone.make_aware(datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT), EAT)
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче