Create custom timetable to run shredder every 4 weeks (#1771)

This commit is contained in:
Daniel Thorn 2023-08-14 15:55:39 -07:00 коммит произвёл GitHub
Родитель 5856924b3c
Коммит c443fb32f5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 147 добавлений и 5 удалений

Просмотреть файл

@ -4,6 +4,8 @@ from airflow import DAG
from utils.gcp import gke_command
from utils.tags import Tag
from plugins.timetable import MultiWeekTimetable
docs = """
### shredder
@ -34,7 +36,7 @@ default_args = {
"owner": "dthorn@mozilla.com",
"depends_on_past": True,
"start_date": datetime(2023, 5, 16),
"catchup": True,
"catchup": False,
"email": [
"telemetry-alerts@mozilla.com",
"dthorn@mozilla.com",
@ -53,7 +55,9 @@ tags = [
dag = DAG(
"shredder",
default_args=default_args,
schedule_interval=timedelta(days=28),
# 4 week intervals from start_date. This is similar to
# schedule_interval=timedelta(days=28), except it should actually work.
schedule=MultiWeekTimetable(num_weeks=4),
doc_md=docs,
tags=tags,
)
@ -62,12 +66,12 @@ base_command = [
"script/shredder_delete",
"--state-table=moz-fx-data-shredder.shredder_state.shredder_state",
"--task-table=moz-fx-data-shredder.shredder_state.tasks",
# dags run schedule_interval after ds, and end date should be one day
# before the dag runs, so 28-1 = 27 days after ds.
# dags run one schedule interval after ds, end date should be one day before the dag
# runs, and schedule intervals are 4 weeks = 28 days, so 28-1 = 27 days after ds
"--end-date={{macros.ds_add(ds, 27)}}",
# start date should be two schedule intervals before end date, to avoid
# race conditions with downstream tables and pings received shortly after a
# deletion request.
# deletion request. schedule intervals are 4 weeks = 28 days.
# This is temporarily increased to 4 intervals, in order handle outstanding backlog
"--start-date={{macros.ds_add(ds, 27-28*4)}}",
# non-dml statements use LEFT JOIN instead of IN to filter rows, which takes about

58
plugins/timetable.py Normal file
Просмотреть файл

@ -0,0 +1,58 @@
"""Plugin for alternative timetables that cannot be trivially defined via cron expressions."""
from datetime import timedelta
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from pendulum import UTC, DateTime, Time
class MultiWeekTimetable(Timetable):
def __init__(self, *, num_weeks: int, time: Time = Time.min):
self.interval_delta = timedelta(days=7 * num_weeks)
# only enforced for automated data intervals
self.time = time
def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
return DataInterval(start=run_after - self.interval_delta, end=run_after)
def next_dagrun_info(
self,
*,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
if restriction.earliest is None: # No start_date specified. Don't schedule.
return None
# Find the first run on the regular schedule.
next_end = (
DateTime.combine(restriction.earliest, self.time).replace(tzinfo=UTC)
+ self.interval_delta
)
max_end = next_end
if last_automated_data_interval is not None:
# There was a previous run on the regular schedule.
# Return the next interval after last_automated_data_interval.end that is
# aligned with restriction.earliest and self.time
max_end = last_automated_data_interval.end + self.interval_delta
elif not restriction.catchup:
# This is the first ever run on the regular schedule, and catchup is not
# enabled. Return the last complete interval before now.
max_end = DateTime.utcnow()
if next_end < max_end:
# Return the last complete interval on or before max_end. Use integer
# division on the number of whole days rather than deal with any corner
# cases related to leap seconds and partial days.
skip_intervals = (max_end - next_end).days // self.interval_delta.days
next_end = next_end + (self.interval_delta * skip_intervals)
if restriction.latest is not None and next_end > restriction.latest:
return None # Over the DAG's scheduled end; don't schedule.
return DagRunInfo.interval(start=next_end - self.interval_delta, end=next_end)
class MozillaTimetablePlugin(AirflowPlugin):
name = "mozilla_timetable_plugin"
timetables = [MultiWeekTimetable]

80
tests/test_timetable.py Normal file
Просмотреть файл

@ -0,0 +1,80 @@
from unittest import mock
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction
from pendulum import UTC, DateTime, Time
from plugins.timetable import MultiWeekTimetable
def test_manual_interval():
tt = MultiWeekTimetable(num_weeks=4)
actual = tt.infer_manual_data_interval(run_after=DateTime(2023, 1, 29))
expected = DataInterval(start=DateTime(2023, 1, 1), end=DateTime(2023, 1, 29))
assert actual == expected
def test_first_automated_interval():
tt = MultiWeekTimetable(num_weeks=4, time=Time(hour=4))
actual = tt.next_dagrun_info(
last_automated_data_interval=None,
restriction=TimeRestriction(
earliest=DateTime(2023, 1, 1), latest=None, catchup=True
),
)
expected = DagRunInfo.interval(
start=DateTime(2023, 1, 1, 4, tzinfo=UTC),
end=DateTime(2023, 1, 29, 4, tzinfo=UTC),
)
assert actual == expected
def test_first_automated_interval_no_catchup():
tt = MultiWeekTimetable(num_weeks=4)
with mock.patch.object(
DateTime, "utcnow", return_value=DateTime(2023, 2, 28, tzinfo=UTC)
):
actual = tt.next_dagrun_info(
last_automated_data_interval=None,
restriction=TimeRestriction(
earliest=DateTime(2023, 1, 1), latest=None, catchup=False
),
)
expected = DagRunInfo.interval(
start=DateTime(2023, 1, 29, tzinfo=UTC), end=DateTime(2023, 2, 26, tzinfo=UTC)
)
assert actual == expected
def test_next_automated_interval():
tt = MultiWeekTimetable(num_weeks=4)
actual = tt.next_dagrun_info(
last_automated_data_interval=DataInterval(
start=DateTime(2023, 1, 29, tzinfo=UTC),
end=DateTime(2023, 2, 26, tzinfo=UTC),
),
restriction=TimeRestriction(
earliest=DateTime(2023, 1, 1),
latest=DateTime(2023, 3, 26, tzinfo=UTC),
catchup=False,
),
)
expected = DagRunInfo.interval(
start=DateTime(2023, 2, 26, tzinfo=UTC), end=DateTime(2023, 3, 26, tzinfo=UTC)
)
assert actual == expected
def test_last_automated_interval():
tt = MultiWeekTimetable(num_weeks=4)
actual = tt.next_dagrun_info(
last_automated_data_interval=DataInterval(
start=DateTime(2023, 1, 29, tzinfo=UTC),
end=DateTime(2023, 2, 26, tzinfo=UTC),
),
restriction=TimeRestriction(
earliest=DateTime(2023, 1, 1),
latest=DateTime(2023, 2, 26, tzinfo=UTC),
catchup=False,
),
)
assert actual is None