Pass SchedulerJob.subdir to Dagbag (#13291)

Because `SchedulerJob.subdir` was not used in Airflow 2.0, whenever SchedulerJob() would be initialized, it would serialize all the DAGs to the DB from settings.DAG_FOLDER.

```
root@b11b273fdffb:/opt/airflow# pytest tests/jobs/test_scheduler_job.py -k test_dag_file_processor_process_task_instances --durations=0

Before:

 9 passed, 120 deselected, 2 warnings in 22.11s =======================================================================================================

After:

 9 passed, 120 deselected, 2 warnings in 10.56s =======================================================================================================

```

(cherry picked from commit 3f52f1aca4)
This commit is contained in:
Kaxil Naik 2020-12-24 17:01:01 +00:00
Родитель ea5af5bf7a
Коммит 168a83d5ee
3 изменённых файлов: 47 добавлений и 48 удалений

Просмотреть файл

@ -733,7 +733,7 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes
self.max_tis_per_query: int = conf.getint('scheduler', 'max_tis_per_query')
self.processor_agent: Optional[DagFileProcessorAgent] = None
self.dagbag = DagBag(read_dags_from_db=True)
self.dagbag = DagBag(dag_folder=self.subdir, read_dags_from_db=True)
def register_signals(self) -> None:
"""Register signals that stop child processes"""

Просмотреть файл

@ -396,7 +396,7 @@ class TestDagFileProcessor(unittest.TestCase):
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler.processor_agent = mock.MagicMock()
scheduler.dagbag.bag_dag(dag, root_dag=dag)
dag.clear()
@ -453,7 +453,7 @@ class TestDagFileProcessor(unittest.TestCase):
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler.processor_agent = mock.MagicMock()
scheduler.dagbag.bag_dag(dag, root_dag=dag)
dag.clear()
@ -512,7 +512,7 @@ class TestDagFileProcessor(unittest.TestCase):
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler.processor_agent = mock.MagicMock()
scheduler.dagbag.bag_dag(dag, root_dag=dag)
dag.clear()
@ -545,7 +545,7 @@ class TestDagFileProcessor(unittest.TestCase):
dag = DAG(dag_id='test_scheduler_add_new_task', start_date=DEFAULT_DATE)
BashOperator(task_id='dummy', dag=dag, owner='airflow', bash_command='echo test')
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler.dagbag.bag_dag(dag, root_dag=dag)
# Since we don't want to store the code for the DAG defined in this file
@ -556,7 +556,7 @@ class TestDagFileProcessor(unittest.TestCase):
orm_dag = session.query(DagModel).get(dag.dag_id)
assert orm_dag is not None
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler.processor_agent = mock.MagicMock()
dag = scheduler.dagbag.get_dag('test_scheduler_add_new_task', session=session)
scheduler._create_dag_runs([orm_dag], session)
@ -599,7 +599,7 @@ class TestDagFileProcessor(unittest.TestCase):
session.close()
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler.processor_agent = mock.MagicMock()
scheduler.dagbag.bag_dag(dag, root_dag=dag)
dag.clear()
@ -707,7 +707,7 @@ class TestDagFileProcessor(unittest.TestCase):
dagbag = DagBag(dag_folder=dag_file, include_examples=False, read_dags_from_db=False)
dagbag.sync_to_db()
scheduler_job = SchedulerJob()
scheduler_job = SchedulerJob(subdir=os.devnull)
scheduler_job.processor_agent = mock.MagicMock()
dag = scheduler_job.dagbag.get_dag("test_only_dummy_tasks")
@ -946,7 +946,7 @@ class TestSchedulerJob(unittest.TestCase):
task1 = DummyOperator(dag=dag, task_id=task_id_1)
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
dagmodel = DagModel(
@ -984,7 +984,7 @@ class TestSchedulerJob(unittest.TestCase):
task1 = DummyOperator(dag=dag, task_id=task_id_1)
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
dag_model = DagModel(
@ -1022,7 +1022,7 @@ class TestSchedulerJob(unittest.TestCase):
task1 = DummyOperator(dag=dag, task_id=task_id_1)
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
dag_model = DagModel(
@ -1059,7 +1059,7 @@ class TestSchedulerJob(unittest.TestCase):
task1 = DummyOperator(dag=dag, task_id=task_id_1)
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
dag_model = DagModel(
@ -1111,7 +1111,7 @@ class TestSchedulerJob(unittest.TestCase):
task2 = DummyOperator(dag=dag, task_id=task_id_2, pool='b')
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
dag_model = DagModel(
@ -1219,7 +1219,7 @@ class TestSchedulerJob(unittest.TestCase):
task = DummyOperator(dag=dag, task_id=task_id, pool="this_pool_doesnt_exist")
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
dag_model = DagModel(
@ -1252,7 +1252,7 @@ class TestSchedulerJob(unittest.TestCase):
DummyOperator(dag=dag, task_id=task_id_1)
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
dag_model = DagModel(
@ -1279,7 +1279,7 @@ class TestSchedulerJob(unittest.TestCase):
task1 = DummyOperator(dag=dag, task_id=task_id_1)
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
dag_model = DagModel(
@ -1340,7 +1340,7 @@ class TestSchedulerJob(unittest.TestCase):
task3 = DummyOperator(dag=dag, task_id='dummy3')
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
dag_model = DagModel(
dag_id=dag_id,
@ -1475,7 +1475,7 @@ class TestSchedulerJob(unittest.TestCase):
task1 = DummyOperator(dag=dag, task_id=task_id_1)
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
date = DEFAULT_DATE
@ -1521,7 +1521,7 @@ class TestSchedulerJob(unittest.TestCase):
task1 = DummyOperator(dag=dag, task_id=task_id_1)
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
dag_model = DagModel(
@ -1560,7 +1560,7 @@ class TestSchedulerJob(unittest.TestCase):
task2 = DummyOperator(dag=dag, task_id=task_id_2)
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
# create first dag run with 1 running and 1 queued
@ -1642,7 +1642,7 @@ class TestSchedulerJob(unittest.TestCase):
task2 = DummyOperator(dag=dag, task_id=task_id_2)
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
dag_model = DagModel(
@ -1802,7 +1802,7 @@ class TestSchedulerJob(unittest.TestCase):
# If there's no left over task in executor.queued_tasks, nothing happens
session = settings.Session()
scheduler_job = SchedulerJob()
scheduler_job = SchedulerJob(subdir=os.devnull)
mock_logger = mock.MagicMock()
test_executor = MockExecutor(do_update=False)
scheduler_job.executor = test_executor
@ -1959,7 +1959,7 @@ class TestSchedulerJob(unittest.TestCase):
DummyOperator(task_id='dummy', dag=dag, owner='airflow')
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler.dagbag.bag_dag(dag, root_dag=dag)
scheduler.dagbag.sync_to_db()
@ -1970,7 +1970,7 @@ class TestSchedulerJob(unittest.TestCase):
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler._create_dag_runs([orm_dag], session)
drs = DagRun.find(dag_id=dag.dag_id, session=session)
@ -2022,7 +2022,7 @@ class TestSchedulerJob(unittest.TestCase):
DummyOperator(task_id='dummy', dag=dag, owner='airflow')
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler.dagbag.bag_dag(dag, root_dag=dag)
scheduler.dagbag.sync_to_db()
@ -2032,7 +2032,7 @@ class TestSchedulerJob(unittest.TestCase):
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler._create_dag_runs([orm_dag], session)
drs = DagRun.find(dag_id=dag.dag_id, session=session)
@ -2082,7 +2082,7 @@ class TestSchedulerJob(unittest.TestCase):
DummyOperator(task_id='dummy', dag=dag, owner='airflow')
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler.processor_agent = mock.Mock()
scheduler.processor_agent.send_callback_to_execute = mock.Mock()
scheduler._send_sla_callbacks_to_processor = mock.Mock()
@ -2137,7 +2137,7 @@ class TestSchedulerJob(unittest.TestCase):
BashOperator(task_id='test_task', dag=dag, owner='airflow', bash_command='echo hi')
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler.processor_agent = mock.Mock()
scheduler.processor_agent.send_callback_to_execute = mock.Mock()
scheduler._send_dag_callbacks_to_processor = mock.Mock()
@ -2189,9 +2189,8 @@ class TestSchedulerJob(unittest.TestCase):
# Re-create the DAG, but remove the task
dag = DAG(dag_id='test_scheduler_do_not_schedule_removed_task', start_date=DEFAULT_DATE)
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
res = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
self.assertEqual([], res)
@ -2708,7 +2707,7 @@ class TestSchedulerJob(unittest.TestCase):
dag = DAG(dag_id='test_verify_integrity_if_dag_not_changed', start_date=DEFAULT_DATE)
BashOperator(task_id='dummy', dag=dag, owner='airflow', bash_command='echo hi')
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler.dagbag.bag_dag(dag, root_dag=dag)
scheduler.dagbag.sync_to_db()
@ -2716,7 +2715,7 @@ class TestSchedulerJob(unittest.TestCase):
orm_dag = session.query(DagModel).get(dag.dag_id)
assert orm_dag is not None
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler.processor_agent = mock.MagicMock()
dag = scheduler.dagbag.get_dag('test_verify_integrity_if_dag_not_changed', session=session)
scheduler._create_dag_runs([orm_dag], session)
@ -2761,7 +2760,7 @@ class TestSchedulerJob(unittest.TestCase):
dag = DAG(dag_id='test_verify_integrity_if_dag_changed', start_date=DEFAULT_DATE)
BashOperator(task_id='dummy', dag=dag, owner='airflow', bash_command='echo hi')
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler.dagbag.bag_dag(dag, root_dag=dag)
scheduler.dagbag.sync_to_db()
@ -2769,7 +2768,7 @@ class TestSchedulerJob(unittest.TestCase):
orm_dag = session.query(DagModel).get(dag.dag_id)
assert orm_dag is not None
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler.processor_agent = mock.MagicMock()
dag = scheduler.dagbag.get_dag('test_verify_integrity_if_dag_changed', session=session)
scheduler._create_dag_runs([orm_dag], session)
@ -3274,7 +3273,7 @@ class TestSchedulerJob(unittest.TestCase):
task_id = dag_id + '_task'
DummyOperator(task_id=task_id, dag=dag)
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
dr1 = dag.create_dagrun(
@ -3300,7 +3299,7 @@ class TestSchedulerJob(unittest.TestCase):
task_id = dag_id + '_task'
DummyOperator(task_id=task_id, dag=dag)
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
session.add(scheduler)
session.flush()
@ -3329,7 +3328,7 @@ class TestSchedulerJob(unittest.TestCase):
task_id = dag_id + '_task'
task = DummyOperator(task_id=task_id, dag=dag)
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
@ -3350,7 +3349,7 @@ class TestSchedulerJob(unittest.TestCase):
task_id = dag_id + '_task'
DummyOperator(task_id=task_id, dag=dag)
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
session.add(scheduler)
session.flush()
@ -3380,7 +3379,7 @@ class TestSchedulerJob(unittest.TestCase):
task_id = dag_id + '_task'
DummyOperator(task_id=task_id, dag=dag)
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
session = settings.Session()
session.add(scheduler)
session.flush()
@ -3409,13 +3408,13 @@ class TestSchedulerJob(unittest.TestCase):
DummyOperator(task_id='task1', dag=dag)
DummyOperator(task_id='task2', dag=dag)
scheduler_job = SchedulerJob()
scheduler_job = SchedulerJob(subdir=os.devnull)
session = settings.Session()
scheduler_job.state = State.RUNNING
scheduler_job.latest_heartbeat = timezone.utcnow()
session.add(scheduler_job)
old_job = SchedulerJob()
old_job = SchedulerJob(subdir=os.devnull)
old_job.state = State.RUNNING
old_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15)
session.add(old_job)
@ -3458,7 +3457,7 @@ class TestSchedulerJob(unittest.TestCase):
DummyOperator(task_id='task1', dag=dag)
with patch.object(settings, "CHECK_SLAS", False):
scheduler_job = SchedulerJob()
scheduler_job = SchedulerJob(subdir=os.devnull)
mock_agent = mock.MagicMock()
scheduler_job.processor_agent = mock_agent
@ -3473,7 +3472,7 @@ class TestSchedulerJob(unittest.TestCase):
DummyOperator(task_id='task1', dag=dag)
with patch.object(settings, "CHECK_SLAS", True):
scheduler_job = SchedulerJob()
scheduler_job = SchedulerJob(subdir=os.devnull)
mock_agent = mock.MagicMock()
scheduler_job.processor_agent = mock_agent
@ -3488,7 +3487,7 @@ class TestSchedulerJob(unittest.TestCase):
DummyOperator(task_id='task1', dag=dag, sla=timedelta(seconds=60))
with patch.object(settings, "CHECK_SLAS", True):
scheduler_job = SchedulerJob()
scheduler_job = SchedulerJob(subdir=os.devnull)
mock_agent = mock.MagicMock()
scheduler_job.processor_agent = mock_agent
@ -3568,7 +3567,7 @@ class TestSchedulerJob(unittest.TestCase):
dag.sync_to_db(session=session) # Update the date fields
job = SchedulerJob()
job = SchedulerJob(subdir=os.devnull)
job.executor = MockExecutor(do_update=False)
job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
@ -3612,7 +3611,7 @@ class TestSchedulerJob(unittest.TestCase):
dag.sync_to_db(session=session) # Update the date fields
job = SchedulerJob()
job = SchedulerJob(subdir=os.devnull)
job.executor = MockExecutor(do_update=False)
job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
@ -3661,7 +3660,7 @@ class TestSchedulerJob(unittest.TestCase):
dag.sync_to_db(session=session) # Update the date fields
job = SchedulerJob()
job = SchedulerJob(subdir=os.devnull)
job.executor = MockExecutor(do_update=False)
job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)

Просмотреть файл

@ -1945,7 +1945,7 @@ class TestTaskInstance(unittest.TestCase):
for upstream, downstream in dependencies.items():
dag.set_dependency(upstream, downstream)
scheduler = SchedulerJob()
scheduler = SchedulerJob(subdir=os.devnull)
scheduler.dagbag.bag_dag(dag, root_dag=dag)
dag_run = dag.create_dagrun(run_id='test_dagrun_fast_follow', state=State.RUNNING)