[AIRFLOW-558] Add Support for dag.catchup=(True|False) Option

Added a dag.catchup option and modified the
scheduler to look at the value when scheduling
DagRuns
(by moving dag.start_date up to
dag.previous_schedule),
and added a config option catchup_by_default
(defaults to True) that allows users to set this
to False for all
dags modifying the existing DAGs

In addition, we added a test to jobs.py
(test_dag_catchup_option)

Closes #1830 from
btallman/NoBackfill_clean_feature
This commit is contained in:
Benjamin Tallman 2017-01-13 12:39:55 +01:00 коммит произвёл Bolke de Bruin
Родитель e0f5c0cb8b
Коммит 1caaceb388
6 изменённых файлов: 252 добавлений и 10 удалений

Просмотреть файл

@ -356,6 +356,14 @@ child_process_log_directory = /tmp/airflow/scheduler/logs
# associated task instance as failed and will re-schedule the task.
scheduler_zombie_task_threshold = 300
# Turn off scheduler catchup by setting this to False.
# Default behavior is unchanged and
# Command Line Backfills still work, but the scheduler
# will not do scheduler catchup if this is False,
# however it can be set on a per DAG basis in the
# DAG definition (catchup)
catchup_by_default = True
# Statsd (https://github.com/etsy/statsd) integration settings
statsd_on = False
statsd_host = localhost
@ -486,6 +494,8 @@ job_heartbeat_sec = 1
scheduler_heartbeat_sec = 5
authenticate = true
max_threads = 2
catchup_by_default = True
scheduler_zombie_task_threshold = 300
"""

Просмотреть файл

@ -729,6 +729,25 @@ class SchedulerJob(BaseJob):
if dag.schedule_interval == '@once' and last_scheduled_run:
return None
# don't do scheduler catchup for dag's that don't have dag.catchup = True
if not dag.catchup:
# The logic is that we move start_date up until
# one period before, so that datetime.now() is AFTER
# the period end, and the job can be created...
now = datetime.now()
next_start = dag.following_schedule(now)
last_start = dag.previous_schedule(now)
if next_start <= now:
new_start = last_start
else:
new_start = dag.previous_schedule(last_start)
if dag.start_date:
if new_start >= dag.start_date:
dag.start_date = new_start
else:
dag.start_date = new_start
next_run_date = None
if not last_scheduled_run:
# First run
@ -756,6 +775,10 @@ class SchedulerJob(BaseJob):
self.logger.debug("Dag start date: {}. Next run date: {}"
.format(dag.start_date, next_run_date))
# don't ever schedule in the future
if next_run_date > datetime.now():
return
# this structure is necessary to avoid a TypeError from concatenating
# NoneType
if dag.schedule_interval == '@once':

Просмотреть файл

@ -1023,12 +1023,28 @@ class TaskInstance(Base):
@provide_session
def previous_ti(self, session=None):
""" The task instance for the task that ran before this task instance """
return session.query(TaskInstance).filter(
TaskInstance.dag_id == self.dag_id,
TaskInstance.task_id == self.task.task_id,
TaskInstance.execution_date ==
self.task.dag.previous_schedule(self.execution_date),
).first()
dag = self.task.dag
if dag:
dr = self.get_dagrun(session=session)
if not dr:
# Means that this TI is NOT being run from a DR, but from a catchup
previous_scheduled_date = dag.previous_schedule(self.execution_date)
if not previous_scheduled_date:
return None
else:
return TaskInstance(task=self.task, execution_date=previous_scheduled_date)
if dag.catchup:
last_dagrun = dr.get_previous_scheduled_dagrun(session=session) if dr else None
else:
last_dagrun = dr.get_previous_dagrun(session=session) if dr else None
if last_dagrun:
return last_dagrun.get_task_instance(self.task_id, session=session)
return None
@provide_session
def are_dependencies_met(
@ -2540,6 +2556,8 @@ class DAG(BaseDag, LoggingMixin):
:type sla_miss_callback: types.FunctionType
:param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT)
:type orientation: string
:param catchup: Perform scheduler catchup (or only run latest)? Defaults to True
"type catchup: bool"
"""
def __init__(
@ -2557,6 +2575,7 @@ class DAG(BaseDag, LoggingMixin):
dagrun_timeout=None,
sla_miss_callback=None,
orientation=configuration.get('webserver', 'dag_orientation'),
catchup=configuration.getboolean('scheduler', 'catchup_by_default'),
params=None):
self.user_defined_macros = user_defined_macros
@ -2597,6 +2616,7 @@ class DAG(BaseDag, LoggingMixin):
self.dagrun_timeout = dagrun_timeout
self.sla_miss_callback = sla_miss_callback
self.orientation = orientation
self.catchup = catchup
self._comps = {
'dag_id',
@ -3847,6 +3867,29 @@ class DagRun(Base):
return self.dag
@provide_session
def get_previous_dagrun(self, session=None):
"""The previous DagRun, if there is one"""
return session.query(DagRun).filter(
DagRun.dag_id == self.dag_id,
DagRun.execution_date < self.execution_date
).order_by(
DagRun.execution_date.desc()
).first()
@provide_session
def get_previous_scheduled_dagrun(self, session=None):
"""The previous, SCHEDULED DagRun, if there is one"""
if not self.dag:
return None
return session.query(DagRun).filter(
DagRun.dag_id == self.dag_id,
DagRun.execution_date == self.dag.previous_schedule(self.execution_date)
).first()
@provide_session
def update_state(self, session=None):
"""

Просмотреть файл

@ -39,10 +39,22 @@ class PrevDagrunDep(BaseTIDep):
raise StopIteration
# Don't depend on the previous task instance if we are the first task
if ti.execution_date == ti.task.start_date:
yield self._passing_status(
reason="This task instance was the first task instance for it's task.")
raise StopIteration
dag = ti.task.dag
if dag.catchup:
if ti.execution_date == ti.task.start_date:
yield self._passing_status(
reason="This task instance was the first task instance for its task.")
raise StopIteration
else:
dr = ti.get_dagrun()
last_dagrun = dr.get_previous_dagrun() if dr else None
if not last_dagrun:
yield self._passing_status(
reason="This task instance was the first task instance for its task.")
raise StopIteration
previous_ti = ti.previous_ti
if not previous_ti:

Просмотреть файл

@ -17,6 +17,9 @@ the run stamped ``2016-01-01`` will be trigger soon after ``2016-01-01T23:59``.
In other words, the job instance is started once the period it covers
has ended.
**Let's Repeat That** The scheduler runs your job one ``schedule_interval`` AFTER the
start date, at the END of the period.
The scheduler starts an instance of the executor specified in the your
``airflow.cfg``. If it happens to be the ``LocalExecutor``, tasks will be
executed as subprocesses; in the case of ``CeleryExecutor`` and
@ -72,6 +75,55 @@ should be triggered and come to a crawl. It might also create undesired
processing when changing the shape of your DAG, by say adding in new
tasks.
Backfill and Catchup
''''''''''''''''''''
An Airflow DAG with a ``start_date``, possibly an ``end_date``, and a ``schedule_interval`` defines a
series of intervals which the scheduler turn into individual Dag Runs and execute. A key capability of
Airflow is that these DAG Runs are atomic, idempotent items, and the scheduler, by default, will examine
the lifetime of the DAG (from start to end/now, one interval at a time) and kick off a DAG Run for any
interval that has not been run (or has been cleared). This concept is called Catchup.
If your DAG is written to handle it's own catchup (IE not limited to the interval, but instead to "Now"
for instance.), then you will want to turn catchup off (Either on the DAG itself with ``dag.catchup =
False``) or by default at the configuration file level with ``catchup_by_default = False``. What this
will do, is to instruct the scheduler to only create a DAG Run for the most current instance of the DAG
interval series.
.. code:: python
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 12, 1),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'schedule_interval': '@hourly',
}
dag = DAG('tutorial', catchup=False, default_args=default_args)
In the example above, if the DAG is picked up by the scheduler daemon on 2016-01-02 at 6 AM, (or from the
command line), a single DAG Run will be created, with an ``execution_date`` of 2016-01-01, and the next
one will be created just after midnight on the morning of 2016-01-03 with an execution date of 2016-01-02.
If the ``dag.catchup`` value had been True instead, the scheduler would have created a DAG Run for each
completed interval between 2015-12-01 and 2016-01-02 (but not yet one for 2016-01-02, as that interval
hasn't completed) and the scheduler will execute them sequentially. This behavior is great for atomic
datasets that can easily be split into periods. Turning catchup off is great if your DAG Runs perform
backfill internally.
External Triggers
'''''''''''''''''

Просмотреть файл

@ -1101,3 +1101,105 @@ class SchedulerJobTest(unittest.TestCase):
running_date = 'Except'
self.assertEqual(execution_date, running_date, 'Running Date must match Execution Date')
def test_dag_catchup_option(self):
"""
Test to check that a DAG with catchup = False only schedules beginning now, not back to the start date
"""
now = datetime.datetime.now()
six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace(minute=0, second=0, microsecond=0)
three_minutes_ago = now - datetime.timedelta(minutes=3)
two_hours_and_three_minutes_ago = three_minutes_ago - datetime.timedelta(hours=2)
START_DATE = six_hours_ago_to_the_hour
DAG_NAME1 = 'no_catchup_test1'
DAG_NAME2 = 'no_catchup_test2'
DAG_NAME3 = 'no_catchup_test3'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': START_DATE
}
dag1 = DAG(DAG_NAME1,
schedule_interval='* * * * *',
max_active_runs=1,
default_args=default_args
)
default_catchup = configuration.getboolean('scheduler', 'catchup_by_default')
# Test configs have catchup by default ON
self.assertEqual(default_catchup, True)
# Correct default?
self.assertEqual(dag1.catchup, True)
dag2 = DAG(DAG_NAME2,
schedule_interval='* * * * *',
max_active_runs=1,
catchup=False,
default_args=default_args
)
run_this_1 = DummyOperator(task_id='run_this_1', dag=dag2)
run_this_2 = DummyOperator(task_id='run_this_2', dag=dag2)
run_this_2.set_upstream(run_this_1)
run_this_3 = DummyOperator(task_id='run_this_3', dag=dag2)
run_this_3.set_upstream(run_this_2)
session = settings.Session()
orm_dag = DagModel(dag_id=dag2.dag_id)
session.merge(orm_dag)
session.commit()
session.close()
scheduler = SchedulerJob()
dag2.clear()
dr = scheduler.create_dag_run(dag2)
# We had better get a dag run
self.assertIsNotNone(dr)
# The DR should be scheduled in the last 3 minutes, not 6 hours ago
self.assertGreater(dr.execution_date, three_minutes_ago)
# The DR should be scheduled BEFORE now
self.assertLess(dr.execution_date, datetime.datetime.now())
dag3 = DAG(DAG_NAME3,
schedule_interval='@hourly',
max_active_runs=1,
catchup=False,
default_args=default_args
)
run_this_1 = DummyOperator(task_id='run_this_1', dag=dag3)
run_this_2 = DummyOperator(task_id='run_this_2', dag=dag3)
run_this_2.set_upstream(run_this_1)
run_this_3 = DummyOperator(task_id='run_this_3', dag=dag3)
run_this_3.set_upstream(run_this_2)
session = settings.Session()
orm_dag = DagModel(dag_id=dag3.dag_id)
session.merge(orm_dag)
session.commit()
session.close()
scheduler = SchedulerJob()
dag3.clear()
dr = None
dr = scheduler.create_dag_run(dag3)
# We had better get a dag run
self.assertIsNotNone(dr)
# The DR should be scheduled in the last two hours, not 6 hours ago
self.assertGreater(dr.execution_date, two_hours_and_three_minutes_ago)
# The DR should be scheduled BEFORE now
self.assertLess(dr.execution_date, datetime.datetime.now())