Add missing serialization for MultiWeekTimetable (#1782)

This commit is contained in:
Daniel Thorn 2023-08-15 14:58:17 -07:00 коммит произвёл GitHub
Родитель 96b51ca071
Коммит d15c87b143
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 9 добавлений и 0 удалений

Просмотреть файл

@ -1,6 +1,7 @@
"""Plugin for alternative timetables that cannot be trivially defined via cron expressions."""
from datetime import timedelta
from typing import Any
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
@ -9,6 +10,7 @@ from pendulum import UTC, DateTime, Time
class MultiWeekTimetable(Timetable):
def __init__(self, *, num_weeks: int, time: Time = Time.min):
self.num_weeks = num_weeks
self.interval_delta = timedelta(days=7 * num_weeks)
# only enforced for automated data intervals
self.time = time
@ -52,6 +54,13 @@ class MultiWeekTimetable(Timetable):
return None # Over the DAG's scheduled end; don't schedule.
return DagRunInfo.interval(start=next_end - self.interval_delta, end=next_end)
def serialize(self) -> dict[str, Any]:
return {"num_weeks": self.num_weeks, "time": self.time.isoformat()}
@classmethod
def deserialize(cls, value: dict[str, Any]) -> Timetable:
return cls(num_weeks=value["num_weeks"], time=Time.fromisoformat(value["time"]))
class MozillaTimetablePlugin(AirflowPlugin):
name = "mozilla_timetable_plugin"