Merge pull request #261 from airbnb/dep_rule

Making the dependency engine more flexible
This commit is contained in:
Maxime Beauchemin 2015-08-24 23:29:38 -07:00
Родитель f0f41101cb da606caaed
Коммит 5332ebfcd6
4 изменённых файлов: 80 добавлений и 13 удалений

Просмотреть файл

@ -30,7 +30,7 @@ from airflow.executors import DEFAULT_EXECUTOR, LocalExecutor
from airflow.configuration import conf
from airflow.utils import (
AirflowException, State, apply_defaults, provide_session,
is_container, as_tuple)
is_container, as_tuple, TriggerRule)
Base = declarative_base()
ID_LEN = 250
@ -677,6 +677,7 @@ class TaskInstance(Base):
:type flag_upstream_failed: boolean
"""
TI = TaskInstance
TR = TriggerRule
# Using the session if passed as param
session = main_session or settings.Session()
@ -702,7 +703,9 @@ class TaskInstance(Base):
return False
# Checking that all upstream dependencies have succeeded
if task._upstream_list:
if not task._upstream_list or task.trigger_rule == TR.DUMMY:
return True
else:
upstream_task_ids = [t.task_id for t in task._upstream_list]
qry = (
session
@ -711,6 +714,10 @@ class TaskInstance(Base):
case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
func.coalesce(func.sum(
case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
func.coalesce(func.sum(
case([(TI.state == State.FAILED, 1)], else_=0)), 0),
func.coalesce(func.sum(
case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0),
func.count(TI.task_id),
)
.filter(
@ -722,27 +729,36 @@ class TaskInstance(Base):
State.UPSTREAM_FAILED, State.SKIPPED]),
)
)
successes, skipped, done = qry[0]
successes, skipped, failed, upstream_failed, done = qry.first()
if flag_upstream_failed:
if skipped:
self.state = State.SKIPPED
self.start_date = datetime.now()
self.end_date = datetime.now()
session.merge(self)
elif successes < done >= len(task._upstream_list):
self.state = State.UPSTREAM_FAILED
self.start_date = datetime.now()
self.end_date = datetime.now()
session.merge(self)
if successes < len(task._upstream_list):
return False
if task.trigger_rule == TR.ONE_SUCCESS and successes > 0:
return True
elif (task.trigger_rule == TR.ONE_FAILED and
(failed + upstream_failed) > 0):
return True
elif (task.trigger_rule == TR.ALL_SUCCESS and
successes == len(task._upstream_list)):
return True
elif (task.trigger_rule == TR.ALL_FAILED and
failed + upstream_failed == len(task._upstream_list)):
return True
elif (task.trigger_rule == TR.ALL_DONE and
done == len(task._upstream_list)):
return True
if not main_session:
session.commit()
session.close()
return True
return False
def __repr__(self):
return (
@ -1239,6 +1255,14 @@ class BaseOperator(object):
:param on_success_callback: much like the ``on_failure_callback`` excepts
that it is executed when the task succeeds.
:type on_success_callback: callable
:param trigger_rule: defines the rule by which dependencies are applied
for the task to get triggered. Options are:
``{ all_success | all_failed | all_done | one_success |
one_failed | dummy}``
default is ``all_success``. Options can be set as string or
using the constants defined in the static class
``airflow.utils.TriggerRule``
:type trigger_rule: str
"""
# For derived classes to define which fields will get jinjaified
@ -1276,6 +1300,7 @@ class BaseOperator(object):
on_failure_callback=None,
on_success_callback=None,
on_retry_callback=None,
trigger_rule=TriggerRule.ALL_SUCCESS,
*args,
**kwargs):
@ -1288,6 +1313,7 @@ class BaseOperator(object):
self.email_on_failure = email_on_failure
self.start_date = start_date
self.end_date = end_date
self.trigger_rule = trigger_rule
self.depends_on_past = depends_on_past
self.wait_for_downstream = wait_for_downstream
if wait_for_downstream:

Просмотреть файл

@ -37,6 +37,15 @@ class AirflowSensorTimeout(Exception):
pass
class TriggerRule(object):
ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'
class State(object):
"""
Static class with task instance states constants and color method to

Просмотреть файл

@ -206,3 +206,28 @@ detailing the list of tasks that missed their SLA. The event is also recorded
in the database and made available in the web UI under ``Browse->Missed SLAs``
where events can be analyzed and documented.
Trigger Rules
'''''''''''''
Though the normal workflow behavior is to trigger tasks when all their
directly upstream tasks have succeeded, Airflow allows for more complex
dependency settings.
All operators have a ``trigger_rule`` argument which defines the rule by which
the generated task get triggered. The default value for ``trigger_rule`` is
``all_success`` and can be defined as "trigger this task when all directly
upstream tasks have succeeded". All other rules described here are based
on direct parent tasks and are values that can be passed to any operator
while creating tasks:
* ``all_success``: (default) all parents have succeeded
* ``all_failed``: all parents are in a ``failed`` or ``upstream_failed`` state
* ``all_done``: all parents are done with their execution
* ``one_failed``: fires as soon as at least one parent has failed, it does not wait for all parents to be done
* ``one_success``: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
* ``dummy``: dependencies are just for show, trigger at will
Note that these can be used in conjunction with ``depends_on_past`` (boolean)
that, when set to ``True``, keeps a task from getting triggered if the
previous schedule for the task hasn't succeeded.

Просмотреть файл

@ -33,3 +33,10 @@ Here are some of the common causes:
You may also want to read the Scheduler section of the docs and make
sure you fully understand how it proceeds.
**How do I trigger tasks based on another task's failure?**
Check out the ``Trigger Rule`` section in the Concepts section of the
documentation