diff --git a/airflow/models.py b/airflow/models.py index f8a5f0f025..33f3636e52 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -46,6 +46,8 @@ import textwrap import traceback import warnings import hashlib + +from datetime import datetime from urllib.parse import urlparse from sqlalchemy import ( @@ -2996,16 +2998,30 @@ class DAG(BaseDag, LoggingMixin): num=num, delta=self._schedule_interval) def following_schedule(self, dttm): + """ + Calculates the following schedule for this dag in local time + :param dttm: utc datetime + :return: utc datetime + """ if isinstance(self._schedule_interval, six.string_types): + dttm = timezone.make_naive(dttm, self.timezone) cron = croniter(self._schedule_interval, dttm) - return cron.get_next(datetime) + following = timezone.make_aware(cron.get_next(datetime), self.timezone) + return timezone.convert_to_utc(following) elif isinstance(self._schedule_interval, timedelta): return dttm + self._schedule_interval def previous_schedule(self, dttm): + """ + Calculates the previous schedule for this dag in local time + :param dttm: utc datetime + :return: utc datetime + """ if isinstance(self._schedule_interval, six.string_types): + dttm = timezone.make_naive(dttm, self.timezone) cron = croniter(self._schedule_interval, dttm) - return cron.get_prev(datetime) + prev = timezone.make_aware(cron.get_prev(datetime), self.timezone) + return timezone.convert_to_utc(prev) elif isinstance(self._schedule_interval, timedelta): return dttm - self._schedule_interval diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py index b8fe89e386..5ae7802b74 100644 --- a/airflow/utils/timezone.py +++ b/airflow/utils/timezone.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import datetime as dt import pendulum from airflow.settings import TIMEZONE @@ -66,3 +67,72 @@ def convert_to_utc(value): value = pendulum.instance(value, TIMEZONE) return value.astimezone(utc) + + +def make_aware(value, timezone=None): + """ + Make a naive datetime.datetime in a given time zone aware. + + :param value: datetime + :param timezone: timezone + :return: localized datetime in settings.TIMEZONE or timezone + + """ + if timezone is None: + timezone = TIMEZONE + + # Check that we won't overwrite the timezone of an aware datetime. + if is_localized(value): + raise ValueError( + "make_aware expects a naive datetime, got %s" % value) + + if hasattr(timezone, 'localize'): + # This method is available for pytz time zones. + return timezone.localize(value) + elif hasattr(timezone, 'convert'): + # For pendulum + return timezone.convert(value) + else: + # This may be wrong around DST changes! + return value.replace(tzinfo=timezone) + + +def make_naive(value, timezone=None): + """ + Make an aware datetime.datetime naive in a given time zone. + + :param value: datetime + :param timezone: timezone + :return: naive datetime + """ + if timezone is None: + timezone = TIMEZONE + + # Emulate the behavior of astimezone() on Python < 3.6. + if is_naive(value): + raise ValueError("make_naive() cannot be applied to a naive datetime") + + o = value.astimezone(timezone) + + # cross library compatibility + naive = dt.datetime(o.year, + o.month, + o.day, + o.hour, + o.minute, + o.second, + o.microsecond) + + return naive + + +def datetime(*args, **kwargs): + """ + Wrapper around datetime.datetime that adds settings.TIMEZONE if tzinfo not specified + + :return: datetime.datetime + """ + if 'tzinfo' not in kwargs: + kwargs['tzinfo'] = TIMEZONE + + return dt.datetime(*args, **kwargs) diff --git a/tests/utils/test_timezone.py b/tests/utils/test_timezone.py index 778c772440..3d4cc7c7c9 100644 --- a/tests/utils/test_timezone.py +++ b/tests/utils/test_timezone.py @@ -46,3 +46,22 @@ class TimezoneTest(unittest.TestCase): eat = datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT) utc = datetime.datetime(2011, 9, 1, 10, 20, 30, tzinfo=UTC) self.assertEquals(utc, timezone.convert_to_utc(eat)) + + def test_make_naive(self): + self.assertEqual( + timezone.make_naive(datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT), EAT), + datetime.datetime(2011, 9, 1, 13, 20, 30)) + self.assertEqual( + timezone.make_naive(datetime.datetime(2011, 9, 1, 17, 20, 30, tzinfo=ICT), EAT), + datetime.datetime(2011, 9, 1, 13, 20, 30)) + + with self.assertRaises(ValueError): + timezone.make_naive(datetime.datetime(2011, 9, 1, 13, 20, 30), EAT) + + def test_make_aware(self): + self.assertEqual( + timezone.make_aware(datetime.datetime(2011, 9, 1, 13, 20, 30), EAT), + datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT)) + with self.assertRaises(ValueError): + timezone.make_aware(datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT), EAT) +