From 16bafda656cb4dcc421bceefdd0ade51b518e034 Mon Sep 17 00:00:00 2001 From: Scott Seaton Date: Sun, 28 Aug 2022 15:22:26 -0400 Subject: [PATCH] backport of fix for scheduler adoption --- airflow/executors/kubernetes_executor.py | 6 +- tests/executors/test_kubernetes_executor.py | 76 +++++++++++++++++++++ 2 files changed, 79 insertions(+), 3 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index ec7cbf7fd9..6a79d033db 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -586,15 +586,15 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): self.event_buffer[key] = state, None def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]: - tis_to_flush = [ti for ti in tis if not ti.external_executor_id] - scheduler_job_ids = [ti.external_executor_id for ti in tis] + tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id] + scheduler_job_ids = {ti.queued_by_job_id for ti in tis} pod_ids = { create_pod_id( dag_id=pod_generator.make_safe_label_value(ti.dag_id), task_id=pod_generator.make_safe_label_value(ti.task_id), ): ti for ti in tis - if ti.external_executor_id + if ti.queued_by_job_id } kube_client: client.CoreV1Api = self.kube_client for scheduler_job_id in scheduler_job_ids: diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 8d3d5b4d45..ab476fdf04 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -419,6 +419,82 @@ class TestKubernetesExecutor(unittest.TestCase): assert not mock_kube_client.patch_namespaced_pod.called assert pod_ids == {"foobar": {}} + @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task') + @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods') + def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_launched_task): + executor = self.kubernetes_executor + executor.scheduler_job_id = "10" + mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", dag_id="dag", task_id="task") + pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo", labels={"dag_id": "dag", "task_id": "task"})) + pod_id = create_pod_id(dag_id="dag", task_id="task") + mock_kube_client = mock.MagicMock() + mock_kube_client.list_namespaced_pod.return_value.items = [pod] + executor.kube_client = mock_kube_client + + # First adoption + executor.try_adopt_task_instances([mock_ti]) + mock_kube_client.list_namespaced_pod.assert_called_once_with( + namespace='default', label_selector='airflow-worker=1' + ) + mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {pod_id: mock_ti}) + mock_adopt_completed_pods.assert_called_once() + # We aren't checking the return value of `try_adopt_task_instances` because it relies on + # `adopt_launched_task` mutating its arg. This should be refactored, but not right now. + + # Second adoption (queued_by_job_id and external_executor_id no longer match) + mock_kube_client.reset_mock() + mock_adopt_launched_task.reset_mock() + mock_adopt_completed_pods.reset_mock() + + mock_ti.queued_by_job_id = "10" # scheduler_job would have updated this after the first adoption + executor.scheduler_job_id = "20" + + executor.try_adopt_task_instances([mock_ti]) + mock_kube_client.list_namespaced_pod.assert_called_once_with( + namespace='default', label_selector='airflow-worker=10' + ) + mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {pod_id: mock_ti}) + mock_adopt_completed_pods.assert_called_once() + + @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods') + def test_try_adopt_task_instances_multiple_scheduler_ids(self, mock_adopt_completed_pods): + """We try to find pods only once per scheduler id""" + executor = self.kubernetes_executor + mock_kube_client = mock.MagicMock() + executor.kube_client = mock_kube_client + + mock_tis = [ + mock.MagicMock(queued_by_job_id="10", external_executor_id="1", dag_id="dag", task_id="task"), + mock.MagicMock(queued_by_job_id="40", external_executor_id="1", dag_id="dag", task_id="task2"), + mock.MagicMock(queued_by_job_id="40", external_executor_id="1", dag_id="dag", task_id="task3"), + ] + + executor.try_adopt_task_instances(mock_tis) + assert mock_kube_client.list_namespaced_pod.call_count == 2 + mock_kube_client.list_namespaced_pod.assert_has_calls( + [ + mock.call(namespace='default', label_selector='airflow-worker=10'), + mock.call(namespace='default', label_selector='airflow-worker=40'), + ], + any_order=True, + ) + + @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task') + @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods') + def test_try_adopt_task_instances_no_matching_pods( + self, mock_adopt_completed_pods, mock_adopt_launched_task + ): + executor = self.kubernetes_executor + mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", dag_id="dag", task_id="task") + mock_kube_client = mock.MagicMock() + mock_kube_client.list_namespaced_pod.return_value.items = [] + executor.kube_client = mock_kube_client + + tis_to_flush = executor.try_adopt_task_instances([mock_ti]) + assert tis_to_flush == [mock_ti] + mock_adopt_launched_task.assert_not_called() + mock_adopt_completed_pods.assert_called_once() + class TestKubernetesJobWatcher(unittest.TestCase): def setUp(self):