[AIRFLOW-3627] Refine performance of /task_stats (#4433)

This commit is contained in:
Xiaodong 2019-01-09 08:24:52 +08:00 коммит произвёл Kaxil Naik
Родитель 9972c4fa98
Коммит 0222a36ab1
2 изменённых файлов: 24 добавлений и 22 удалений

Просмотреть файл

@ -594,21 +594,24 @@ class Airflow(BaseView):
dag_ids = session.query(Dag.dag_id)
LastDagRun = (
session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
.join(Dag, Dag.dag_id == DagRun.dag_id)
.filter(DagRun.state != State.RUNNING)
.filter(Dag.is_active == True) # noqa: E712
.filter(Dag.is_subdag == False) # noqa: E712
.group_by(DagRun.dag_id)
.subquery('last_dag_run')
session.query(
DagRun.dag_id,
sqla.func.max(DagRun.execution_date).label('execution_date')
)
.join(Dag, Dag.dag_id == DagRun.dag_id)
.filter(DagRun.state != State.RUNNING,
Dag.is_active,
Dag.is_subdag == False) # noqa: E712
.group_by(DagRun.dag_id)
.subquery('last_dag_run')
)
RunningDagRun = (
session.query(DagRun.dag_id, DagRun.execution_date)
.join(Dag, Dag.dag_id == DagRun.dag_id)
.filter(DagRun.state == State.RUNNING)
.filter(Dag.is_active == True) # noqa: E712
.filter(Dag.is_subdag == False) # noqa: E712
.subquery('running_dag_run')
.join(Dag, Dag.dag_id == DagRun.dag_id)
.filter(DagRun.state == State.RUNNING,
Dag.is_active,
Dag.is_subdag == False) # noqa: E712
.subquery('running_dag_run')
)
# Select all task_instances from active dag_runs.

Просмотреть файл

@ -337,19 +337,18 @@ class Airflow(AirflowBaseView):
LastDagRun = (
session.query(
DagRun.dag_id,
sqla.func.max(DagRun.execution_date).label('execution_date'))
.join(Dag, Dag.dag_id == DagRun.dag_id)
.filter(DagRun.state != State.RUNNING)
.filter(Dag.is_active == True) # noqa
.group_by(DagRun.dag_id)
.subquery('last_dag_run')
DagRun.dag_id,
sqla.func.max(DagRun.execution_date).label('execution_date')
)
.join(Dag, Dag.dag_id == DagRun.dag_id)
.filter(DagRun.state != State.RUNNING, Dag.is_active)
.group_by(DagRun.dag_id)
.subquery('last_dag_run')
)
RunningDagRun = (
session.query(DagRun.dag_id, DagRun.execution_date)
.join(Dag, Dag.dag_id == DagRun.dag_id)
.filter(DagRun.state == State.RUNNING)
.filter(Dag.is_active == True) # noqa
.filter(DagRun.state == State.RUNNING, Dag.is_active)
.subquery('running_dag_run')
)
@ -1407,7 +1406,7 @@ class Airflow(AirflowBaseView):
TF = models.TaskFail
ti_fails = (
session.query(TF)
.filter(TF.dag_id == dag.dag_id, # noqa
.filter(TF.dag_id == dag.dag_id,
TF.execution_date >= min_date,
TF.execution_date <= base_date,
TF.task_id.in_([t.task_id for t in dag.tasks]))