From 6c57e83627ebf2955be6a8543dba31b784aca732 Mon Sep 17 00:00:00 2001 From: Maxime Beauchemin Date: Fri, 27 Mar 2015 20:28:58 -0700 Subject: [PATCH] Mark tasks as successful (false positive) from the UI --- airflow/models.py | 3 +- airflow/www/app.py | 75 +++++++++++++++++++++++--- airflow/www/templates/airflow/dag.html | 25 +++++++++ 3 files changed, 95 insertions(+), 8 deletions(-) diff --git a/airflow/models.py b/airflow/models.py index ac90b86eab..060d3cbfd5 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -319,10 +319,11 @@ class TaskInstance(Base): Index('ti_state_lkp', dag_id, task_id, execution_date, state), ) - def __init__(self, task, execution_date, job=None): + def __init__(self, task, execution_date, state=None, job=None): self.dag_id = task.dag_id self.task_id = task.task_id self.execution_date = execution_date + self.state = state self.task = task self.try_number = 1 self.unixname = getpass.getuser() diff --git a/airflow/www/app.py b/airflow/www/app.py index a4dc6d33fd..af30201d2f 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -811,21 +811,22 @@ class Airflow(BaseView): @expose('/action') def action(self): - session = settings.Session() action = request.args.get('action') dag_id = request.args.get('dag_id') + task_id = request.args.get('task_id') origin = request.args.get('origin') dag = dagbag.dags[dag_id] + task = dag.get_task(task_id) + + execution_date = request.args.get('execution_date') + execution_date = dateutil.parser.parse(execution_date) + confirmed = request.args.get('confirmed') == "true" + upstream = request.args.get('upstream') == "true" + downstream = request.args.get('downstream') == "true" if action == 'clear': - task_id = request.args.get('task_id') - execution_date = request.args.get('execution_date') - execution_date = dateutil.parser.parse(execution_date) future = request.args.get('future') == "true" past = request.args.get('past') == "true" - upstream = request.args.get('upstream') == "true" - downstream = request.args.get('downstream') == "true" - confirmed = request.args.get('confirmed') == "true" dag = dag.sub_dag( task_regex=r"^{0}$".format(task_id), @@ -860,8 +861,68 @@ class Airflow(BaseView): "to clear:"), details=details,) + return response + elif action == 'success': + # Flagging tasks as successful + session = settings.Session() + task_ids = [task_id] + if downstream: + task_ids += [ + t.task_id + for t in task.get_flat_relatives(upstream=False)] + if upstream: + task_ids += [ + t.task_id + for t in task.get_flat_relatives(upstream=True)] + TI = models.TaskInstance + tis = session.query(TI).filter( + TI.dag_id==dag_id, + TI.execution_date==execution_date, + TI.task_id.in_(task_ids)).all() + + if confirmed: + + updated_task_ids = [] + for ti in tis: + updated_task_ids.append(ti.task_id) + ti.state = State.SUCCESS + + session.commit() + + to_insert = list(set(task_ids) - set(updated_task_ids)) + for task_id in to_insert: + ti = TI( + task=dag.get_task(task_id), + execution_date=execution_date, + state=State.SUCCESS) + session.add(ti) + session.commit() + session.commit() session.close() + flash("Marked success on {} task instances".format( + len(task_ids))) + + return redirect(origin) + else: + if not task_ids: + flash("No task instances to mark as successful", 'error') + response = redirect(origin) + else: + tis = [] + for task_id in task_ids: + tis.append(TI( + task=dag.get_task(task_id), + execution_date=execution_date, + state=State.SUCCESS)) + details = "\n".join([str(t) for t in tis]) + + response = self.render( + 'airflow/confirm.html', + message=( + "Here's the list of task instances you are about " + "to mark as successful:"), + details=details,) return response @expose('/tree') diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index 3afb723a23..854de7828a 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -100,6 +100,18 @@
+ + + + + +