Make scheduler_dag_execution_timing grok dynamic start date of elastic dag (#8952)

The scheduler_dag_execution_timing script wants to run _n_ dag runs to
completion. However since the start date of those dags is Dynamic (`now
- delta`) we can't pre-compute the execution_dates like we were before.
(This is because the execution_date of the very first dag run would be
`now()` of the parser process, but if we try to pre-compute that in
the benchmark process it would see a different value of now().)

This PR changes it to instead watch for the first _n_ dag runs to be
completed. This should make it work with more dags with less changes to
them.
This commit is contained in:
Ash Berlin-Taylor 2020-05-21 15:56:54 +01:00 коммит произвёл GitHub
Родитель b26b3ca978
Коммит 113982b25d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 40 добавлений и 31 удалений

Просмотреть файл

@ -21,6 +21,8 @@ import os
import statistics
import sys
import time
from argparse import Namespace
from operator import attrgetter
import click
@ -31,17 +33,23 @@ class ShortCircuitExecutorMixin:
'''
Mixin class to manage the scheduler state during the performance test run.
'''
def __init__(self, stop_when_these_completed):
def __init__(self, dag_ids_to_watch, num_runs):
super().__init__()
self.reset(stop_when_these_completed)
self.num_runs_per_dag = num_runs
self.reset(dag_ids_to_watch)
def reset(self, stop_when_these_completed):
def reset(self, dag_ids_to_watch):
'''
Capture the value that will determine when the scheduler is reset.
'''
self.stop_when_these_completed = {
# We want to store the dag run here, but we don't have them until they are created.
key: None for key in stop_when_these_completed
self.dags_to_watch = {
dag_id: Namespace(
waiting_for=self.num_runs_per_dag,
# A "cache" of DagRun row, so we don't have to look it up each
# time. This is to try and reduce the impact of our
# benchmarking code on runtime,
runs={}
) for dag_id in dag_ids_to_watch
}
def change_state(self, key, state):
@ -53,26 +61,33 @@ class ShortCircuitExecutorMixin:
super().change_state(key, state)
dag_id, _, execution_date, __ = key
run_key = (dag_id, execution_date.timestamp())
if run_key in self.stop_when_these_completed:
if dag_id not in self.dags_to_watch:
return
if self.stop_when_these_completed[run_key] is None:
import airflow.models
# This fn is called before the DagRun state is updated, so we can't
# check the DR.state - so instead we need to check the state of the
# tasks in that run
# We are interested in this run, but don't yet have the record for it.
run = airflow.models.DagRun.find(dag_id=dag_id, execution_date=execution_date)[0]
self.stop_when_these_completed[run_key] = run
else:
run = self.stop_when_these_completed[run_key]
run = self.dags_to_watch[dag_id].runs.get(execution_date)
if not run:
import airflow.models
# odd `list()` is to work across Airflow versions.
run = list(airflow.models.DagRun.find(dag_id=dag_id, execution_date=execution_date))[0]
self.dags_to_watch[dag_id].runs[execution_date] = run
if run and all(t.state == State.SUCCESS for t in run.get_task_instances()):
self.stop_when_these_completed.pop(run_key)
if run and all(t.state == State.SUCCESS for t in run.get_task_instances()):
self.dags_to_watch[dag_id].runs.pop(execution_date)
self.dags_to_watch[dag_id].waiting_for -= 1
if not self.stop_when_these_completed:
self.log.warning("STOPPING SCHEDULER -- all runs complete")
self.scheduler_job.processor_agent._done = True # pylint: disable=protected-access
else:
self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed))
if self.dags_to_watch[dag_id].waiting_for == 0:
self.dags_to_watch.pop(dag_id)
if not self.dags_to_watch:
self.log.warning("STOPPING SCHEDULER -- all runs complete")
self.scheduler_job.processor_agent._done = True # pylint: disable=protected-access
return
self.log.warning("WAITING ON %d RUNS",
sum(map(attrgetter('waiting_for'), self.dags_to_watch.values())))
def get_executor_under_test():
@ -193,6 +208,7 @@ def main(num_runs, repeat, pre_create_dag_runs, dag_ids): # pylint: disable=too
# Set this so that dags can dynamically configure their end_date
os.environ['AIRFLOW_BENCHMARK_MAX_DAG_RUNS'] = str(num_runs)
os.environ['PERF_MAX_RUNS'] = str(num_runs)
if pre_create_dag_runs:
os.environ['AIRFLOW__SCHEDULER__USE_JOB_SCHEDULE'] = 'False'
@ -204,7 +220,6 @@ def main(num_runs, repeat, pre_create_dag_runs, dag_ids): # pylint: disable=too
dagbag = DagBag()
dags = []
dag_run_keys = []
with db.create_session() as session:
pause_all_dags(session)
@ -216,14 +231,8 @@ def main(num_runs, repeat, pre_create_dag_runs, dag_ids): # pylint: disable=too
next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks))
# Compute the execution_dates we want to wait on. We only "need" the last
# one, but having them all means we get a more useful "count down" in the
# log
dag_run_keys.append((dag.dag_id, next_run_date.timestamp()))
for _ in range(num_runs - 1):
next_run_date = dag.following_schedule(next_run_date)
dag_run_keys.append((dag.dag_id, next_run_date.timestamp()))
end_date = dag.end_date or dag.default_args.get('end_date')
if end_date != next_run_date:
@ -238,7 +247,7 @@ def main(num_runs, repeat, pre_create_dag_runs, dag_ids): # pylint: disable=too
ShortCircutExecutor = get_executor_under_test()
executor = ShortCircutExecutor(stop_when_these_completed=dag_run_keys)
executor = ShortCircutExecutor(dag_ids_to_watch=dag_ids, num_runs=num_runs)
scheduler_job = SchedulerJob(dag_ids=dag_ids, do_pickle=False, executor=executor)
executor.scheduler_job = scheduler_job
@ -269,7 +278,7 @@ def main(num_runs, repeat, pre_create_dag_runs, dag_ids): # pylint: disable=too
for dag in dags:
reset_dag(dag, session)
executor.reset(dag_run_keys)
executor.reset(dag_ids)
scheduler_job = SchedulerJob(dag_ids=dag_ids, do_pickle=False, executor=executor)
executor.scheduler_job = scheduler_job