diff --git a/airflow/contrib/sensors/weekday_sensor.py b/airflow/contrib/sensors/weekday_sensor.py new file mode 100644 index 0000000000..4cff263f38 --- /dev/null +++ b/airflow/contrib/sensors/weekday_sensor.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import six +from airflow.contrib.utils.weekday import WeekDay +from airflow.sensors.base_sensor_operator import BaseSensorOperator +from airflow.utils import timezone +from airflow.utils.decorators import apply_defaults + + +class DayOfWeekSensor(BaseSensorOperator): + """ + Waits until the first specified day of the week. For example, if the execution + day of the task is '2018-12-22' (Saturday) and you pass 'FRIDAY', the task will wait + until next Friday. + + **Example** (with single day): :: + + weekend_check = DayOfWeekSensor( + task_id='weekend_check', + week_day='Saturday', + use_task_execution_day=True, + dag=dag) + + **Example** (with multiple day using set): :: + + weekend_check = DayOfWeekSensor( + task_id='weekend_check', + week_day={'Saturday', 'Sunday'}, + use_task_execution_day=True, + dag=dag) + + **Example** (with :class:`~airflow.contrib.utils.weekday.WeekDay` enum): :: + + # import WeekDay Enum + from airflow.contrib.utils.weekday import WeekDay + + weekend_check = DayOfWeekSensor( + task_id='weekend_check', + week_day={WeekDay.Saturday, WeekDay.Sunday}, + use_task_execution_day=True, + dag=dag) + + :param week_day: Day of the week to check (full name). Optionally, a set + of days can also be provided using a set. + Example values: + * ``"MONDAY"``, + * ``{"Saturday", "Sunday"}`` + * ``{WeekDay.TUESDAY}`` + * ``{WeekDay.Saturday, WeekDay.Sunday}`` + :type week_day: set or str or WeekDay + :param use_task_execution_day: If ``True``, uses task's execution day to compare + with week_day. Execution Date is Useful for backfilling. + If ``False``, uses system's day of the week. Useful when you + don't want to run anything on weekdays on the system. + :type use_task_execution_day: bool + """ + + @apply_defaults + def __init__(self, week_day, + use_task_execution_day=False, + *args, **kwargs): + super(DayOfWeekSensor, self).__init__(*args, **kwargs) + self.week_day = week_day + self.use_task_execution_day = use_task_execution_day + if isinstance(self.week_day, six.string_types): + self._week_day_num = {WeekDay.get_weekday_number(week_day_str=self.week_day)} + elif isinstance(self.week_day, WeekDay): + self._week_day_num = {self.week_day} + elif isinstance(self.week_day, set): + if all(isinstance(day, six.string_types) for day in self.week_day): + self._week_day_num = {WeekDay.get_weekday_number(day) for day in week_day} + elif all(isinstance(day, WeekDay) for day in self.week_day): + self._week_day_num = self.week_day + else: + raise TypeError( + 'Unsupported Type for week_day parameter: {}. It should be one of str' + ', set or Weekday enum type'.format(type(week_day))) + + def poke(self, context): + self.log.info('Poking until weekday is in %s, Today is %s', + self.week_day, + WeekDay(timezone.utcnow().isoweekday()).name) + if self.use_task_execution_day: + return context['execution_date'].isoweekday() in self._week_day_num + else: + return timezone.utcnow().isoweekday() in self._week_day_num diff --git a/airflow/contrib/utils/weekday.py b/airflow/contrib/utils/weekday.py new file mode 100644 index 0000000000..1fe4ad3a59 --- /dev/null +++ b/airflow/contrib/utils/weekday.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import enum + + +@enum.unique +class WeekDay(enum.IntEnum): + """ + Python Enum containing Days of the Week + """ + MONDAY = 1 + TUESDAY = 2 + WEDNESDAY = 3 + THURSDAY = 4 + FRIDAY = 5 + SATURDAY = 6 + SUNDAY = 7 + + @classmethod + def get_weekday_number(cls, week_day_str): + """ + Return the ISO Week Day Number for a Week Day + + :param week_day_str: Full Name of the Week Day. Example: "Sunday" + :type week_day_str: str + :return: ISO Week Day Number corresponding to the provided Weedkay + """ + sanitized_week_day_str = week_day_str.upper() + + if sanitized_week_day_str not in cls.__members__: + raise AttributeError( + 'Invalid Week Day passed: "{}"'.format(week_day_str) + ) + + return cls[sanitized_week_day_str] diff --git a/docs/code.rst b/docs/code.rst index e890adffec..a670a2d8fd 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -266,6 +266,7 @@ Sensors .. autoclass:: airflow.contrib.sensors.sagemaker_tuning_sensor.SageMakerTuningSensor .. autoclass:: airflow.contrib.sensors.sftp_sensor.SFTPSensor .. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbBlobSensor +.. autoclass:: airflow.contrib.sensors.weekday_sensor.DayOfWeekSensor .. _macros: diff --git a/tests/contrib/sensors/test_weekday_sensor.py b/tests/contrib/sensors/test_weekday_sensor.py new file mode 100644 index 0000000000..55a4c4da4b --- /dev/null +++ b/tests/contrib/sensors/test_weekday_sensor.py @@ -0,0 +1,163 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest +from airflow import DAG, configuration, models +from airflow.contrib.sensors.weekday_sensor import DayOfWeekSensor +from airflow.contrib.utils.weekday import WeekDay +from airflow.exceptions import AirflowSensorTimeout +from airflow.models import DagBag +from airflow.settings import Session +from airflow.utils.timezone import datetime + +DEFAULT_DATE = datetime(2018, 12, 10) +WEEKDAY_DATE = datetime(2018, 12, 20) +WEEKEND_DATE = datetime(2018, 12, 22) +TEST_DAG_ID = 'weekday_sensor_dag' +DEV_NULL = '/dev/null' + + +class DayOfWeekSensorTests(unittest.TestCase): + + def setUp(self): + configuration.load_test_config() + self.dagbag = DagBag( + dag_folder=DEV_NULL, + include_examples=True + ) + self.args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + } + dag = DAG(TEST_DAG_ID, default_args=self.args) + self.dag = dag + + def tearDown(self): + session = Session() + session.query(models.TaskInstance).filter_by( + dag_id=TEST_DAG_ID).delete() + session.query(models.TaskFail).filter_by( + dag_id=TEST_DAG_ID).delete() + session.commit() + session.close() + + def test_weekday_sensor_true(self): + t = DayOfWeekSensor( + task_id='weekday_sensor_check_true', + week_day='Thursday', + use_task_execution_day=True, + dag=self.dag) + t.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True) + + def test_weekday_sensor_false(self): + t = DayOfWeekSensor( + task_id='weekday_sensor_check_false', + poke_interval=1, + timeout=2, + week_day='Tuesday', + use_task_execution_day=True, + dag=self.dag) + with self.assertRaises(AirflowSensorTimeout): + t.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True) + + def test_invalid_weekday_number(self): + invalid_week_day = 'Thsday' + with self.assertRaisesRegexp(AttributeError, + 'Invalid Week Day passed: "{}"'.format( + invalid_week_day)): + DayOfWeekSensor( + task_id='weekday_sensor_invalid_weekday_num', + week_day=invalid_week_day, + use_task_execution_day=True, + dag=self.dag) + + def test_weekday_sensor_with_enum(self): + week_day = WeekDay.THURSDAY + t = DayOfWeekSensor( + task_id='weekday_sensor_check_true', + week_day=WeekDay.THURSDAY, + use_task_execution_day=True, + dag=self.dag) + t.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True) + self.assertEqual(t.week_day, week_day) + + def test_weekday_sensor_with_enum_set(self): + week_day = {WeekDay.THURSDAY} + t = DayOfWeekSensor( + task_id='weekday_sensor_check_true', + week_day=week_day, + use_task_execution_day=True, + dag=self.dag) + t.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True) + self.assertEqual(t.week_day, week_day) + + def test_weekday_sensor_with_enum_set_2_items(self): + week_day = {WeekDay.THURSDAY, WeekDay.FRIDAY} + t = DayOfWeekSensor( + task_id='weekday_sensor_check_true', + week_day=week_day, + use_task_execution_day=True, + dag=self.dag) + t.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True) + self.assertEqual(t.week_day, week_day) + + def test_weekday_sensor_with_string_set(self): + week_day = {'Thursday'} + t = DayOfWeekSensor( + task_id='weekday_sensor_check_true', + week_day=week_day, + use_task_execution_day=True, + dag=self.dag) + t.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True) + self.assertEqual(t.week_day, week_day) + + def test_weekday_sensor_with_string_set_2_items(self): + week_day = {'Thursday', 'Friday'} + t = DayOfWeekSensor( + task_id='weekday_sensor_check_true', + week_day=week_day, + use_task_execution_day=True, + dag=self.dag) + t.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True) + self.assertEqual(t.week_day, week_day) + + def test_weekday_sensor_with_invalid_type(self): + invalid_week_day = ['Thsday'] + with self.assertRaisesRegexp(TypeError, + 'Unsupported Type for week_day parameter:' + ' {}. It should be one of str, set or ' + 'Weekday enum type'.format(type(invalid_week_day)) + ): + DayOfWeekSensor( + task_id='weekday_sensor_check_true', + week_day=invalid_week_day, + use_task_execution_day=True, + dag=self.dag) + + def test_weekday_sensor_timeout_with_set(self): + t = DayOfWeekSensor( + task_id='weekday_sensor_check_false', + poke_interval=1, + timeout=2, + week_day={WeekDay.MONDAY, WeekDay.TUESDAY}, + use_task_execution_day=True, + dag=self.dag) + with self.assertRaises(AirflowSensorTimeout): + t.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True) diff --git a/tests/contrib/utils/test_weekday.py b/tests/contrib/utils/test_weekday.py new file mode 100644 index 0000000000..961652a4ea --- /dev/null +++ b/tests/contrib/utils/test_weekday.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import unittest + +from enum import Enum + +from airflow.contrib.utils.weekday import WeekDay + + +class WeekDayTest(unittest.TestCase): + def test_weekday_enum_length(self): + self.assertEqual(len(WeekDay), 7) + + def test_weekday_name_value(self): + weekdays = "MONDAY TUESDAY WEDNESDAY THURSDAY FRIDAY SATURDAY SUNDAY" + weekdays = weekdays.split() + for i, weekday in enumerate(weekdays, start=1): + e = WeekDay(i) + self.assertEqual(e, i) + self.assertEqual(int(e), i) + self.assertEqual(e.name, weekday) + self.assertTrue(e in WeekDay) + self.assertTrue(0 < e < 8) + self.assertTrue(type(e) is WeekDay) + self.assertTrue(isinstance(e, int)) + self.assertTrue(isinstance(e, Enum))