From 96f1dd1fcffba179c2c9ed4a055a483cd3ec4264 Mon Sep 17 00:00:00 2001 From: Maxime Date: Fri, 14 Aug 2015 17:25:28 +0000 Subject: [PATCH 1/4] Making the dependency engine more flexible --- airflow/models.py | 52 +++++++++++++++++++++++++++++++++++------------ airflow/utils.py | 9 ++++++++ 2 files changed, 48 insertions(+), 13 deletions(-) diff --git a/airflow/models.py b/airflow/models.py index 3cf7942c24..ef9f59519e 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -30,7 +30,7 @@ from airflow.executors import DEFAULT_EXECUTOR, LocalExecutor from airflow.configuration import conf from airflow.utils import ( AirflowException, State, apply_defaults, provide_session, - is_container, as_tuple) + is_container, as_tuple, TriggerRule) Base = declarative_base() ID_LEN = 250 @@ -650,6 +650,7 @@ class TaskInstance(Base): :type flag_upstream_failed: boolean """ TI = TaskInstance + TR = TriggerRule # Using the session if passed as param session = main_session or settings.Session() @@ -675,7 +676,9 @@ class TaskInstance(Base): return False # Checking that all upstream dependencies have succeeded - if task._upstream_list: + if not task._upstream_list or t.trigger_rule == TR.DUMMY: + return True + else: upstream_task_ids = [t.task_id for t in task._upstream_list] qry = ( session @@ -684,6 +687,10 @@ class TaskInstance(Base): case([(TI.state == State.SUCCESS, 1)], else_=0)), 0), func.coalesce(func.sum( case([(TI.state == State.SKIPPED, 1)], else_=0)), 0), + func.coalesce(func.sum( + case([(TI.state == State.FAILED, 1)], else_=0)), 0), + func.coalesce(func.sum( + case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0), func.count(TI.task_id), ) .filter( @@ -695,27 +702,36 @@ class TaskInstance(Base): State.UPSTREAM_FAILED, State.SKIPPED]), ) ) - successes, skipped, done = qry[0] + successes, skipped, failed, upstream_failed, done = qry.first() if flag_upstream_failed: if skipped: self.state = State.SKIPPED - self.start_date = datetime.now() - self.end_date = datetime.now() - session.merge(self) - elif successes < done >= len(task._upstream_list): self.state = State.UPSTREAM_FAILED - self.start_date = datetime.now() - self.end_date = datetime.now() - session.merge(self) - if successes < len(task._upstream_list): - return False + self.start_date = datetime.now() + self.end_date = datetime.now() + session.merge(self) + + if t.trigger_rule == TR.ONE_SUCCESS and successes > 0: + return True + elif (t.trigger_rule == TR.ONE_FAILED and + (failed + upstream_failed) > 0): + return True + elif (t.trigger_rule == TR.ALL_SUCCESS and + successes == len(task._upstream_list)): + return True + elif (t.trigger_rule == TR.ALL_FAILED and + failed + upstream_failed == len(task._upstream_list)): + return True + elif (t.trigger_rule == TR.ALL_DONE and + done == len(task._upstream_list)): + return True if not main_session: session.commit() session.close() - return True + return False def __repr__(self): return ( @@ -1212,6 +1228,14 @@ class BaseOperator(object): :param on_success_callback: much like the ``on_failure_callback`` excepts that it is executed when the task succeeds. :type on_success_callback: callable + :param trigger_rule: defines the rule by which dependencies are applied + for the task to get triggered. Options are: + ``{ all_success | all_failed | all_done | one_success | + one_failed | dummy}`` + default is ``all_success``. Options can be set as string or + using the constants defined in the static class + ``airflow.utils.TriggerRule`` + :type trigger_rule: str """ # For derived classes to define which fields will get jinjaified @@ -1249,6 +1273,7 @@ class BaseOperator(object): on_failure_callback=None, on_success_callback=None, on_retry_callback=None, + trigger_rule=TriggerRule.ALL_SUCCESS, *args, **kwargs): @@ -1261,6 +1286,7 @@ class BaseOperator(object): self.email_on_failure = email_on_failure self.start_date = start_date self.end_date = end_date + self.trigger_rule = trigger_rule self.depends_on_past = depends_on_past self.wait_for_downstream = wait_for_downstream if wait_for_downstream: diff --git a/airflow/utils.py b/airflow/utils.py index 233f5fbe72..69c95fb551 100644 --- a/airflow/utils.py +++ b/airflow/utils.py @@ -36,6 +36,15 @@ class AirflowSensorTimeout(Exception): pass +class TriggerRule(object): + ALL_SUCCESS = 'all_success' + ALL_FAILED = 'all_failed' + ALL_DONE = 'all_done' + ONE_SUCCESS = 'one_success' + ONE_FAILED = 'one_failed' + DUMMY = 'dummy' + + class State(object): """ Static class with task instance states constants and color method to From 178a051b13e27b01b2b1b5008bba5193003e73e9 Mon Sep 17 00:00:00 2001 From: Maxime Date: Sat, 15 Aug 2015 14:01:34 +0000 Subject: [PATCH 2/4] Testing/debugging --- airflow/models.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/airflow/models.py b/airflow/models.py index ef9f59519e..58a3355612 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -676,7 +676,7 @@ class TaskInstance(Base): return False # Checking that all upstream dependencies have succeeded - if not task._upstream_list or t.trigger_rule == TR.DUMMY: + if not task._upstream_list or task.trigger_rule == TR.DUMMY: return True else: upstream_task_ids = [t.task_id for t in task._upstream_list] @@ -713,18 +713,18 @@ class TaskInstance(Base): self.end_date = datetime.now() session.merge(self) - if t.trigger_rule == TR.ONE_SUCCESS and successes > 0: + if task.trigger_rule == TR.ONE_SUCCESS and successes > 0: return True - elif (t.trigger_rule == TR.ONE_FAILED and + elif (task.trigger_rule == TR.ONE_FAILED and (failed + upstream_failed) > 0): return True - elif (t.trigger_rule == TR.ALL_SUCCESS and + elif (task.trigger_rule == TR.ALL_SUCCESS and successes == len(task._upstream_list)): return True - elif (t.trigger_rule == TR.ALL_FAILED and + elif (task.trigger_rule == TR.ALL_FAILED and failed + upstream_failed == len(task._upstream_list)): return True - elif (t.trigger_rule == TR.ALL_DONE and + elif (task.trigger_rule == TR.ALL_DONE and done == len(task._upstream_list)): return True From 0919ed0812b8aed7bff6ad9a1e1ec264d2028f66 Mon Sep 17 00:00:00 2001 From: Maxime Date: Sat, 15 Aug 2015 14:28:10 +0000 Subject: [PATCH 3/4] Doc entry --- docs/concepts.rst | 25 +++++++++++++++++++++++++ docs/faq.rst | 7 +++++++ 2 files changed, 32 insertions(+) diff --git a/docs/concepts.rst b/docs/concepts.rst index ca799d59ec..2ac563c711 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -206,3 +206,28 @@ detailing the list of tasks that missed their SLA. The event is also recorded in the database and made available in the web UI under ``Browse->Missed SLAs`` where events can be analyzed and documented. + +Trigger Rules +''''''''''''' + +Though the normal workflow behavior is to trigger tasks when all their +directly upstream tasks have succeeded, Airflow allows for more complex +dependency settings. + +All operators have a ``trigger_rule`` argument which defines the rule by which +the generated task get triggered. The default value for ``trigger_rule`` is +``all_success`` and can be defined as "trigger this task when all directly +upstream tasks have succeeded". All other rules described here are based +on direct parent tasks and are values that can be passed to any operator +while creating tasks: + +* ``all_success``: (default) all parents have succeeded +* ``all_failed``: all parents are in a ``failed`` or ``upstream_failed`` state +* ``all_done``: all parents are done with their execution +* ``one_failed``: at least one parent has failed +* ``one_success``: at least one parent has succeeded +* ``dummy``: dependencies are just for show, trigger at will + +Note that these can be used in conjunction with ``depends_on_past`` (boolean) +that, when set to ``True``, keeps a task from getting triggered if the +previous schedule for the task hasn't succeeded. diff --git a/docs/faq.rst b/docs/faq.rst index f8cf553f94..0c572d29f6 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -33,3 +33,10 @@ Here are some of the common causes: You may also want to read the Scheduler section of the docs and make sure you fully understand how it proceeds. + + +**How do I trigger tasks based on another task's failure?** + +Check out the ``Trigger Rule`` section in the Concepts section of the +documentation + From da606caaedd37d77ce1e9bb38221d7a2e4a78611 Mon Sep 17 00:00:00 2001 From: Maxime Date: Fri, 21 Aug 2015 16:53:45 +0000 Subject: [PATCH 4/4] Clarifying docs entry for trigger_rule --- docs/concepts.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/concepts.rst b/docs/concepts.rst index 2ac563c711..a0bc6de3a9 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -224,8 +224,8 @@ while creating tasks: * ``all_success``: (default) all parents have succeeded * ``all_failed``: all parents are in a ``failed`` or ``upstream_failed`` state * ``all_done``: all parents are done with their execution -* ``one_failed``: at least one parent has failed -* ``one_success``: at least one parent has succeeded +* ``one_failed``: fires as soon as at least one parent has failed, it does not wait for all parents to be done +* ``one_success``: fires as soon as at least one parent succeeds, it does not wait for all parents to be done * ``dummy``: dependencies are just for show, trigger at will Note that these can be used in conjunction with ``depends_on_past`` (boolean)