[AIRFLOW-3560] Add DayOfWeek Sensor (#4363)

* [AIRFLOW-3560] Add WeekEnd & DayOfWeek Sensors

* Change to using Enum

* Fix Docstring

* Refactor into a Single Sensor
This commit is contained in:
Kaxil Naik 2019-01-03 09:35:35 +00:00 коммит произвёл Fokko Driesprong
Родитель dd1bf84938
Коммит 55ec82439e
5 изменённых файлов: 360 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import six
from airflow.contrib.utils.weekday import WeekDay
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults
class DayOfWeekSensor(BaseSensorOperator):
"""
Waits until the first specified day of the week. For example, if the execution
day of the task is '2018-12-22' (Saturday) and you pass 'FRIDAY', the task will wait
until next Friday.
**Example** (with single day): ::
weekend_check = DayOfWeekSensor(
task_id='weekend_check',
week_day='Saturday',
use_task_execution_day=True,
dag=dag)
**Example** (with multiple day using set): ::
weekend_check = DayOfWeekSensor(
task_id='weekend_check',
week_day={'Saturday', 'Sunday'},
use_task_execution_day=True,
dag=dag)
**Example** (with :class:`~airflow.contrib.utils.weekday.WeekDay` enum): ::
# import WeekDay Enum
from airflow.contrib.utils.weekday import WeekDay
weekend_check = DayOfWeekSensor(
task_id='weekend_check',
week_day={WeekDay.Saturday, WeekDay.Sunday},
use_task_execution_day=True,
dag=dag)
:param week_day: Day of the week to check (full name). Optionally, a set
of days can also be provided using a set.
Example values:
* ``"MONDAY"``,
* ``{"Saturday", "Sunday"}``
* ``{WeekDay.TUESDAY}``
* ``{WeekDay.Saturday, WeekDay.Sunday}``
:type week_day: set or str or WeekDay
:param use_task_execution_day: If ``True``, uses task's execution day to compare
with week_day. Execution Date is Useful for backfilling.
If ``False``, uses system's day of the week. Useful when you
don't want to run anything on weekdays on the system.
:type use_task_execution_day: bool
"""
@apply_defaults
def __init__(self, week_day,
use_task_execution_day=False,
*args, **kwargs):
super(DayOfWeekSensor, self).__init__(*args, **kwargs)
self.week_day = week_day
self.use_task_execution_day = use_task_execution_day
if isinstance(self.week_day, six.string_types):
self._week_day_num = {WeekDay.get_weekday_number(week_day_str=self.week_day)}
elif isinstance(self.week_day, WeekDay):
self._week_day_num = {self.week_day}
elif isinstance(self.week_day, set):
if all(isinstance(day, six.string_types) for day in self.week_day):
self._week_day_num = {WeekDay.get_weekday_number(day) for day in week_day}
elif all(isinstance(day, WeekDay) for day in self.week_day):
self._week_day_num = self.week_day
else:
raise TypeError(
'Unsupported Type for week_day parameter: {}. It should be one of str'
', set or Weekday enum type'.format(type(week_day)))
def poke(self, context):
self.log.info('Poking until weekday is in %s, Today is %s',
self.week_day,
WeekDay(timezone.utcnow().isoweekday()).name)
if self.use_task_execution_day:
return context['execution_date'].isoweekday() in self._week_day_num
else:
return timezone.utcnow().isoweekday() in self._week_day_num

Просмотреть файл

@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import enum
@enum.unique
class WeekDay(enum.IntEnum):
"""
Python Enum containing Days of the Week
"""
MONDAY = 1
TUESDAY = 2
WEDNESDAY = 3
THURSDAY = 4
FRIDAY = 5
SATURDAY = 6
SUNDAY = 7
@classmethod
def get_weekday_number(cls, week_day_str):
"""
Return the ISO Week Day Number for a Week Day
:param week_day_str: Full Name of the Week Day. Example: "Sunday"
:type week_day_str: str
:return: ISO Week Day Number corresponding to the provided Weedkay
"""
sanitized_week_day_str = week_day_str.upper()
if sanitized_week_day_str not in cls.__members__:
raise AttributeError(
'Invalid Week Day passed: "{}"'.format(week_day_str)
)
return cls[sanitized_week_day_str]

Просмотреть файл

@ -266,6 +266,7 @@ Sensors
.. autoclass:: airflow.contrib.sensors.sagemaker_tuning_sensor.SageMakerTuningSensor
.. autoclass:: airflow.contrib.sensors.sftp_sensor.SFTPSensor
.. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbBlobSensor
.. autoclass:: airflow.contrib.sensors.weekday_sensor.DayOfWeekSensor
.. _macros:

Просмотреть файл

@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import unittest
from airflow import DAG, configuration, models
from airflow.contrib.sensors.weekday_sensor import DayOfWeekSensor
from airflow.contrib.utils.weekday import WeekDay
from airflow.exceptions import AirflowSensorTimeout
from airflow.models import DagBag
from airflow.settings import Session
from airflow.utils.timezone import datetime
DEFAULT_DATE = datetime(2018, 12, 10)
WEEKDAY_DATE = datetime(2018, 12, 20)
WEEKEND_DATE = datetime(2018, 12, 22)
TEST_DAG_ID = 'weekday_sensor_dag'
DEV_NULL = '/dev/null'
class DayOfWeekSensorTests(unittest.TestCase):
def setUp(self):
configuration.load_test_config()
self.dagbag = DagBag(
dag_folder=DEV_NULL,
include_examples=True
)
self.args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE
}
dag = DAG(TEST_DAG_ID, default_args=self.args)
self.dag = dag
def tearDown(self):
session = Session()
session.query(models.TaskInstance).filter_by(
dag_id=TEST_DAG_ID).delete()
session.query(models.TaskFail).filter_by(
dag_id=TEST_DAG_ID).delete()
session.commit()
session.close()
def test_weekday_sensor_true(self):
t = DayOfWeekSensor(
task_id='weekday_sensor_check_true',
week_day='Thursday',
use_task_execution_day=True,
dag=self.dag)
t.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)
def test_weekday_sensor_false(self):
t = DayOfWeekSensor(
task_id='weekday_sensor_check_false',
poke_interval=1,
timeout=2,
week_day='Tuesday',
use_task_execution_day=True,
dag=self.dag)
with self.assertRaises(AirflowSensorTimeout):
t.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)
def test_invalid_weekday_number(self):
invalid_week_day = 'Thsday'
with self.assertRaisesRegexp(AttributeError,
'Invalid Week Day passed: "{}"'.format(
invalid_week_day)):
DayOfWeekSensor(
task_id='weekday_sensor_invalid_weekday_num',
week_day=invalid_week_day,
use_task_execution_day=True,
dag=self.dag)
def test_weekday_sensor_with_enum(self):
week_day = WeekDay.THURSDAY
t = DayOfWeekSensor(
task_id='weekday_sensor_check_true',
week_day=WeekDay.THURSDAY,
use_task_execution_day=True,
dag=self.dag)
t.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)
self.assertEqual(t.week_day, week_day)
def test_weekday_sensor_with_enum_set(self):
week_day = {WeekDay.THURSDAY}
t = DayOfWeekSensor(
task_id='weekday_sensor_check_true',
week_day=week_day,
use_task_execution_day=True,
dag=self.dag)
t.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)
self.assertEqual(t.week_day, week_day)
def test_weekday_sensor_with_enum_set_2_items(self):
week_day = {WeekDay.THURSDAY, WeekDay.FRIDAY}
t = DayOfWeekSensor(
task_id='weekday_sensor_check_true',
week_day=week_day,
use_task_execution_day=True,
dag=self.dag)
t.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)
self.assertEqual(t.week_day, week_day)
def test_weekday_sensor_with_string_set(self):
week_day = {'Thursday'}
t = DayOfWeekSensor(
task_id='weekday_sensor_check_true',
week_day=week_day,
use_task_execution_day=True,
dag=self.dag)
t.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)
self.assertEqual(t.week_day, week_day)
def test_weekday_sensor_with_string_set_2_items(self):
week_day = {'Thursday', 'Friday'}
t = DayOfWeekSensor(
task_id='weekday_sensor_check_true',
week_day=week_day,
use_task_execution_day=True,
dag=self.dag)
t.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)
self.assertEqual(t.week_day, week_day)
def test_weekday_sensor_with_invalid_type(self):
invalid_week_day = ['Thsday']
with self.assertRaisesRegexp(TypeError,
'Unsupported Type for week_day parameter:'
' {}. It should be one of str, set or '
'Weekday enum type'.format(type(invalid_week_day))
):
DayOfWeekSensor(
task_id='weekday_sensor_check_true',
week_day=invalid_week_day,
use_task_execution_day=True,
dag=self.dag)
def test_weekday_sensor_timeout_with_set(self):
t = DayOfWeekSensor(
task_id='weekday_sensor_check_false',
poke_interval=1,
timeout=2,
week_day={WeekDay.MONDAY, WeekDay.TUESDAY},
use_task_execution_day=True,
dag=self.dag)
with self.assertRaises(AirflowSensorTimeout):
t.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)

Просмотреть файл

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from enum import Enum
from airflow.contrib.utils.weekday import WeekDay
class WeekDayTest(unittest.TestCase):
def test_weekday_enum_length(self):
self.assertEqual(len(WeekDay), 7)
def test_weekday_name_value(self):
weekdays = "MONDAY TUESDAY WEDNESDAY THURSDAY FRIDAY SATURDAY SUNDAY"
weekdays = weekdays.split()
for i, weekday in enumerate(weekdays, start=1):
e = WeekDay(i)
self.assertEqual(e, i)
self.assertEqual(int(e), i)
self.assertEqual(e.name, weekday)
self.assertTrue(e in WeekDay)
self.assertTrue(0 < e < 8)
self.assertTrue(type(e) is WeekDay)
self.assertTrue(isinstance(e, int))
self.assertTrue(isinstance(e, Enum))