Merge pull request #160 from mistercrunch/mark_success
Mark tasks as successful (false positive) from the UI
This commit is contained in:
Коммит
ecc20e0033
|
@ -319,10 +319,11 @@ class TaskInstance(Base):
|
||||||
Index('ti_state_lkp', dag_id, task_id, execution_date, state),
|
Index('ti_state_lkp', dag_id, task_id, execution_date, state),
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, task, execution_date, job=None):
|
def __init__(self, task, execution_date, state=None, job=None):
|
||||||
self.dag_id = task.dag_id
|
self.dag_id = task.dag_id
|
||||||
self.task_id = task.task_id
|
self.task_id = task.task_id
|
||||||
self.execution_date = execution_date
|
self.execution_date = execution_date
|
||||||
|
self.state = state
|
||||||
self.task = task
|
self.task = task
|
||||||
self.try_number = 1
|
self.try_number = 1
|
||||||
self.unixname = getpass.getuser()
|
self.unixname = getpass.getuser()
|
||||||
|
|
|
@ -811,21 +811,22 @@ class Airflow(BaseView):
|
||||||
|
|
||||||
@expose('/action')
|
@expose('/action')
|
||||||
def action(self):
|
def action(self):
|
||||||
session = settings.Session()
|
|
||||||
action = request.args.get('action')
|
action = request.args.get('action')
|
||||||
dag_id = request.args.get('dag_id')
|
dag_id = request.args.get('dag_id')
|
||||||
|
task_id = request.args.get('task_id')
|
||||||
origin = request.args.get('origin')
|
origin = request.args.get('origin')
|
||||||
dag = dagbag.dags[dag_id]
|
dag = dagbag.dags[dag_id]
|
||||||
|
task = dag.get_task(task_id)
|
||||||
|
|
||||||
|
execution_date = request.args.get('execution_date')
|
||||||
|
execution_date = dateutil.parser.parse(execution_date)
|
||||||
|
confirmed = request.args.get('confirmed') == "true"
|
||||||
|
upstream = request.args.get('upstream') == "true"
|
||||||
|
downstream = request.args.get('downstream') == "true"
|
||||||
|
|
||||||
if action == 'clear':
|
if action == 'clear':
|
||||||
task_id = request.args.get('task_id')
|
|
||||||
execution_date = request.args.get('execution_date')
|
|
||||||
execution_date = dateutil.parser.parse(execution_date)
|
|
||||||
future = request.args.get('future') == "true"
|
future = request.args.get('future') == "true"
|
||||||
past = request.args.get('past') == "true"
|
past = request.args.get('past') == "true"
|
||||||
upstream = request.args.get('upstream') == "true"
|
|
||||||
downstream = request.args.get('downstream') == "true"
|
|
||||||
confirmed = request.args.get('confirmed') == "true"
|
|
||||||
|
|
||||||
dag = dag.sub_dag(
|
dag = dag.sub_dag(
|
||||||
task_regex=r"^{0}$".format(task_id),
|
task_regex=r"^{0}$".format(task_id),
|
||||||
|
@ -860,8 +861,68 @@ class Airflow(BaseView):
|
||||||
"to clear:"),
|
"to clear:"),
|
||||||
details=details,)
|
details=details,)
|
||||||
|
|
||||||
|
return response
|
||||||
|
elif action == 'success':
|
||||||
|
# Flagging tasks as successful
|
||||||
|
session = settings.Session()
|
||||||
|
task_ids = [task_id]
|
||||||
|
if downstream:
|
||||||
|
task_ids += [
|
||||||
|
t.task_id
|
||||||
|
for t in task.get_flat_relatives(upstream=False)]
|
||||||
|
if upstream:
|
||||||
|
task_ids += [
|
||||||
|
t.task_id
|
||||||
|
for t in task.get_flat_relatives(upstream=True)]
|
||||||
|
TI = models.TaskInstance
|
||||||
|
tis = session.query(TI).filter(
|
||||||
|
TI.dag_id==dag_id,
|
||||||
|
TI.execution_date==execution_date,
|
||||||
|
TI.task_id.in_(task_ids)).all()
|
||||||
|
|
||||||
|
if confirmed:
|
||||||
|
|
||||||
|
updated_task_ids = []
|
||||||
|
for ti in tis:
|
||||||
|
updated_task_ids.append(ti.task_id)
|
||||||
|
ti.state = State.SUCCESS
|
||||||
|
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
to_insert = list(set(task_ids) - set(updated_task_ids))
|
||||||
|
for task_id in to_insert:
|
||||||
|
ti = TI(
|
||||||
|
task=dag.get_task(task_id),
|
||||||
|
execution_date=execution_date,
|
||||||
|
state=State.SUCCESS)
|
||||||
|
session.add(ti)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
session.commit()
|
session.commit()
|
||||||
session.close()
|
session.close()
|
||||||
|
flash("Marked success on {} task instances".format(
|
||||||
|
len(task_ids)))
|
||||||
|
|
||||||
|
return redirect(origin)
|
||||||
|
else:
|
||||||
|
if not task_ids:
|
||||||
|
flash("No task instances to mark as successful", 'error')
|
||||||
|
response = redirect(origin)
|
||||||
|
else:
|
||||||
|
tis = []
|
||||||
|
for task_id in task_ids:
|
||||||
|
tis.append(TI(
|
||||||
|
task=dag.get_task(task_id),
|
||||||
|
execution_date=execution_date,
|
||||||
|
state=State.SUCCESS))
|
||||||
|
details = "\n".join([str(t) for t in tis])
|
||||||
|
|
||||||
|
response = self.render(
|
||||||
|
'airflow/confirm.html',
|
||||||
|
message=(
|
||||||
|
"Here's the list of task instances you are about "
|
||||||
|
"to mark as successful:"),
|
||||||
|
details=details,)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@expose('/tree')
|
@expose('/tree')
|
||||||
|
|
|
@ -100,6 +100,18 @@
|
||||||
</button>
|
</button>
|
||||||
</span>
|
</span>
|
||||||
<hr/>
|
<hr/>
|
||||||
|
<button id="btn_success" type="button" class="btn btn-primary">
|
||||||
|
Mark Success
|
||||||
|
</button>
|
||||||
|
<span class="btn-group">
|
||||||
|
<button id="btn_success_upstream"
|
||||||
|
type="button" class="btn" data-toggle="button">Upstream</button>
|
||||||
|
<button id="btn_success_downstream"
|
||||||
|
type="button" class="btn" data-toggle="button">
|
||||||
|
Downstream
|
||||||
|
</button>
|
||||||
|
</span>
|
||||||
|
<hr/>
|
||||||
<span>
|
<span>
|
||||||
<button id="btn_graph" type="button" class="btn">
|
<button id="btn_graph" type="button" class="btn">
|
||||||
Graph View
|
Graph View
|
||||||
|
@ -201,6 +213,19 @@
|
||||||
window.location = url;
|
window.location = url;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
$("#btn_success").click(function(){
|
||||||
|
url = "{{ url_for('airflow.action') }}" +
|
||||||
|
"?action=success" +
|
||||||
|
"&task_id=" + task_id +
|
||||||
|
"&dag_id=" + dag_id +
|
||||||
|
"&upstream=" + $('#btn_success_upstream').hasClass('active') +
|
||||||
|
"&downstream=" + $('#btn_success_downstream').hasClass('active') +
|
||||||
|
"&execution_date=" + execution_date +
|
||||||
|
"&origin=" + encodeURIComponent(window.location);
|
||||||
|
|
||||||
|
window.location = url;
|
||||||
|
});
|
||||||
|
|
||||||
$("#btn_gantt").click(function(){
|
$("#btn_gantt").click(function(){
|
||||||
url = "{{ url_for('airflow.gantt') }}" +
|
url = "{{ url_for('airflow.gantt') }}" +
|
||||||
"?dag_id=" + dag_id +
|
"?dag_id=" + dag_id +
|
||||||
|
|
Загрузка…
Ссылка в новой задаче