From dcac3e97a4e1b4429e4baf9d8ab2a7eb4139ad74 Mon Sep 17 00:00:00 2001 From: Bolke de Bruin Date: Sat, 11 Nov 2017 13:38:59 +0100 Subject: [PATCH] [AIRFLOW-1806] Use naive datetime for cron scheduling Converting to naive time is required in order to make sure to run at exact times for crons. E.g. if you specify to run at 8:00pm every day you do not want suddenly to run at 7:00pm due to DST. --- airflow/models.py | 20 +++++++++-- airflow/utils/timezone.py | 70 ++++++++++++++++++++++++++++++++++++ tests/utils/test_timezone.py | 19 ++++++++++ 3 files changed, 107 insertions(+), 2 deletions(-) diff --git a/airflow/models.py b/airflow/models.py index f8a5f0f025..33f3636e52 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -46,6 +46,8 @@ import textwrap import traceback import warnings import hashlib + +from datetime import datetime from urllib.parse import urlparse from sqlalchemy import ( @@ -2996,16 +2998,30 @@ class DAG(BaseDag, LoggingMixin): num=num, delta=self._schedule_interval) def following_schedule(self, dttm): + """ + Calculates the following schedule for this dag in local time + :param dttm: utc datetime + :return: utc datetime + """ if isinstance(self._schedule_interval, six.string_types): + dttm = timezone.make_naive(dttm, self.timezone) cron = croniter(self._schedule_interval, dttm) - return cron.get_next(datetime) + following = timezone.make_aware(cron.get_next(datetime), self.timezone) + return timezone.convert_to_utc(following) elif isinstance(self._schedule_interval, timedelta): return dttm + self._schedule_interval def previous_schedule(self, dttm): + """ + Calculates the previous schedule for this dag in local time + :param dttm: utc datetime + :return: utc datetime + """ if isinstance(self._schedule_interval, six.string_types): + dttm = timezone.make_naive(dttm, self.timezone) cron = croniter(self._schedule_interval, dttm) - return cron.get_prev(datetime) + prev = timezone.make_aware(cron.get_prev(datetime), self.timezone) + return timezone.convert_to_utc(prev) elif isinstance(self._schedule_interval, timedelta): return dttm - self._schedule_interval diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py index b8fe89e386..5ae7802b74 100644 --- a/airflow/utils/timezone.py +++ b/airflow/utils/timezone.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import datetime as dt import pendulum from airflow.settings import TIMEZONE @@ -66,3 +67,72 @@ def convert_to_utc(value): value = pendulum.instance(value, TIMEZONE) return value.astimezone(utc) + + +def make_aware(value, timezone=None): + """ + Make a naive datetime.datetime in a given time zone aware. + + :param value: datetime + :param timezone: timezone + :return: localized datetime in settings.TIMEZONE or timezone + + """ + if timezone is None: + timezone = TIMEZONE + + # Check that we won't overwrite the timezone of an aware datetime. + if is_localized(value): + raise ValueError( + "make_aware expects a naive datetime, got %s" % value) + + if hasattr(timezone, 'localize'): + # This method is available for pytz time zones. + return timezone.localize(value) + elif hasattr(timezone, 'convert'): + # For pendulum + return timezone.convert(value) + else: + # This may be wrong around DST changes! + return value.replace(tzinfo=timezone) + + +def make_naive(value, timezone=None): + """ + Make an aware datetime.datetime naive in a given time zone. + + :param value: datetime + :param timezone: timezone + :return: naive datetime + """ + if timezone is None: + timezone = TIMEZONE + + # Emulate the behavior of astimezone() on Python < 3.6. + if is_naive(value): + raise ValueError("make_naive() cannot be applied to a naive datetime") + + o = value.astimezone(timezone) + + # cross library compatibility + naive = dt.datetime(o.year, + o.month, + o.day, + o.hour, + o.minute, + o.second, + o.microsecond) + + return naive + + +def datetime(*args, **kwargs): + """ + Wrapper around datetime.datetime that adds settings.TIMEZONE if tzinfo not specified + + :return: datetime.datetime + """ + if 'tzinfo' not in kwargs: + kwargs['tzinfo'] = TIMEZONE + + return dt.datetime(*args, **kwargs) diff --git a/tests/utils/test_timezone.py b/tests/utils/test_timezone.py index 778c772440..3d4cc7c7c9 100644 --- a/tests/utils/test_timezone.py +++ b/tests/utils/test_timezone.py @@ -46,3 +46,22 @@ class TimezoneTest(unittest.TestCase): eat = datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT) utc = datetime.datetime(2011, 9, 1, 10, 20, 30, tzinfo=UTC) self.assertEquals(utc, timezone.convert_to_utc(eat)) + + def test_make_naive(self): + self.assertEqual( + timezone.make_naive(datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT), EAT), + datetime.datetime(2011, 9, 1, 13, 20, 30)) + self.assertEqual( + timezone.make_naive(datetime.datetime(2011, 9, 1, 17, 20, 30, tzinfo=ICT), EAT), + datetime.datetime(2011, 9, 1, 13, 20, 30)) + + with self.assertRaises(ValueError): + timezone.make_naive(datetime.datetime(2011, 9, 1, 13, 20, 30), EAT) + + def test_make_aware(self): + self.assertEqual( + timezone.make_aware(datetime.datetime(2011, 9, 1, 13, 20, 30), EAT), + datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT)) + with self.assertRaises(ValueError): + timezone.make_aware(datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT), EAT) +