From 112f7d716900556a7a41e3a8eea197f6bcfe9ed9 Mon Sep 17 00:00:00 2001 From: Tomek Urbaszek Date: Sat, 17 Oct 2020 12:31:07 +0200 Subject: [PATCH] Add creating_job_id to DagRun table (#11396) This PR introduces creating_job_id column in DagRun table that links a DagRun to job that created it. Part of #11302 Co-authored-by: Kaxil Naik --- airflow/jobs/backfill_job.py | 1 + airflow/jobs/base_job.py | 9 ++++ airflow/jobs/scheduler_job.py | 3 +- ...364159666cbd_add_job_id_to_dagrun_table.py | 44 +++++++++++++++++++ airflow/models/dag.py | 8 +++- airflow/models/dagrun.py | 3 ++ tests/jobs/test_backfill_job.py | 14 ++++++ tests/jobs/test_scheduler_job.py | 27 ++++++++++++ tests/models/test_dag.py | 8 ++++ 9 files changed, 114 insertions(+), 3 deletions(-) create mode 100644 airflow/migrations/versions/364159666cbd_add_job_id_to_dagrun_table.py diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py index 4949f584a4..de7c53b15d 100644 --- a/airflow/jobs/backfill_job.py +++ b/airflow/jobs/backfill_job.py @@ -325,6 +325,7 @@ class BackfillJob(BaseJob): session=session, conf=self.conf, run_type=DagRunType.BACKFILL_JOB, + creating_job_id=self.id, ) # set required transient field diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py index 2e37b0d6c3..a6b418a228 100644 --- a/airflow/jobs/base_job.py +++ b/airflow/jobs/base_job.py @@ -29,6 +29,7 @@ from sqlalchemy.orm.session import make_transient from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.executors.executor_loader import ExecutorLoader +from airflow.models import DagRun from airflow.models.base import ID_LEN, Base from airflow.models.taskinstance import TaskInstance from airflow.stats import Stats @@ -78,6 +79,14 @@ class BaseJob(Base, LoggingMixin): foreign_keys=id, backref=backref('queued_by_job', uselist=False), ) + + dag_runs = relationship( + DagRun, + primaryjoin=id == DagRun.creating_job_id, + foreign_keys=id, + backref=backref('creating_job'), + ) + """ TaskInstances which have been enqueued by this Job. diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 24259629d8..ba472c46c5 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1563,7 +1563,8 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes state=State.RUNNING, external_trigger=False, session=session, - dag_hash=dag_hash + dag_hash=dag_hash, + creating_job_id=self.id, ) self._update_dag_next_dagruns(dag_models, session) diff --git a/airflow/migrations/versions/364159666cbd_add_job_id_to_dagrun_table.py b/airflow/migrations/versions/364159666cbd_add_job_id_to_dagrun_table.py new file mode 100644 index 0000000000..b4e1fae11f --- /dev/null +++ b/airflow/migrations/versions/364159666cbd_add_job_id_to_dagrun_table.py @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add creating_job_id to DagRun table + +Revision ID: 364159666cbd +Revises: 849da589634d +Create Date: 2020-10-10 09:08:07.332456 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = '364159666cbd' +down_revision = '849da589634d' +branch_labels = None +depends_on = None + + +def upgrade(): + """Apply Add creating_job_id to DagRun table""" + op.add_column('dag_run', sa.Column('creating_job_id', sa.Integer)) + + +def downgrade(): + """Unapply Add job_id to DagRun table""" + op.drop_column('dag_run', 'creating_job_id') diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 45269f3a84..1d928d2f2b 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1655,7 +1655,8 @@ class DAG(BaseDag, LoggingMixin): conf=None, run_type=None, session=None, - dag_hash=None + dag_hash=None, + creating_job_id=None, ): """ Creates a dag run from this dag including the tasks associated with this dag. @@ -1675,6 +1676,8 @@ class DAG(BaseDag, LoggingMixin): :type external_trigger: bool :param conf: Dict containing configuration/parameters to pass to the DAG :type conf: dict + :param creating_job_id: id of the job creating this DagRun + :type creating_job_id: int :param session: database session :type session: sqlalchemy.orm.session.Session :param dag_hash: Hash of Serialized DAG @@ -1702,7 +1705,8 @@ class DAG(BaseDag, LoggingMixin): conf=conf, state=state, run_type=run_type.value, - dag_hash=dag_hash + dag_hash=dag_hash, + creating_job_id=creating_job_id ) session.add(run) session.flush() diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 39e2348961..07d83c5216 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -58,6 +58,7 @@ class DagRun(Base, LoggingMixin): end_date = Column(UtcDateTime) _state = Column('state', String(50), default=State.RUNNING) run_id = Column(String(ID_LEN)) + creating_job_id = Column(Integer) external_trigger = Column(Boolean, default=True) run_type = Column(String(50), nullable=False) conf = Column(PickleType) @@ -98,6 +99,7 @@ class DagRun(Base, LoggingMixin): state: Optional[str] = None, run_type: Optional[str] = None, dag_hash: Optional[str] = None, + creating_job_id: Optional[int] = None, ): self.dag_id = dag_id self.run_id = run_id @@ -108,6 +110,7 @@ class DagRun(Base, LoggingMixin): self.state = state self.run_type = run_type self.dag_hash = dag_hash + self.creating_job_id = creating_job_id super().__init__() def __repr__(self): diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index ceb571c54a..3041ad075d 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -1536,3 +1536,17 @@ class TestBackfillJob(unittest.TestCase): ti2.refresh_from_db(session=session) self.assertEqual(State.SCHEDULED, ti1.state) self.assertEqual(State.NONE, ti2.state) + + def test_job_id_is_assigned_to_dag_run(self): + dag_id = 'test_job_id_is_assigned_to_dag_run' + dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') + DummyOperator(task_id="dummy_task", dag=dag) + + job = BackfillJob( + dag=dag, + executor=MockExecutor(), + start_date=datetime.datetime.now() - datetime.timedelta(days=1) + ) + job.run() + dr: DagRun = dag.get_last_dagrun() + assert dr.creating_job_id == job.id diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 14e4a4f0bb..2e2b56b58b 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3488,6 +3488,33 @@ class TestSchedulerJob(unittest.TestCase): full_filepath=dag.fileloc, dag_id=dag_id ) + def test_scheduler_sets_job_id_on_dag_run(self): + dag = DAG( + dag_id='test_scheduler_sets_job_id_on_dag_run', + start_date=DEFAULT_DATE) + + DummyOperator( + task_id='dummy', + dag=dag, + ) + + dagbag = DagBag( + dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"), + include_examples=False, + read_dags_from_db=True + ) + dagbag.bag_dag(dag=dag, root_dag=dag) + dagbag.sync_to_db() + dag_model = DagModel.get_dagmodel(dag.dag_id) + + scheduler = SchedulerJob(executor=self.null_exec) + scheduler.processor_agent = mock.MagicMock() + + with create_session() as session: + scheduler._create_dag_runs([dag_model], session) + + assert dag.get_last_dagrun().creating_job_id == scheduler.id + @pytest.mark.xfail(reason="Work out where this goes") def test_task_with_upstream_skip_process_task_instances(): diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 3280922d24..afe42e3d63 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -1348,6 +1348,14 @@ class TestDag(unittest.TestCase): dr = dag.create_dagrun(run_id="custom_is_set_to_manual", state=State.NONE) assert dr.run_type == DagRunType.MANUAL.value + def test_create_dagrun_job_id_is_set(self): + job_id = 42 + dag = DAG(dag_id="test_create_dagrun_job_id_is_set") + dr = dag.create_dagrun( + run_id="test_create_dagrun_job_id_is_set", state=State.NONE, creating_job_id=job_id + ) + assert dr.creating_job_id == job_id + @parameterized.expand( [ (State.NONE,),