From 94d3ed61d60b134d649a4e9785b2d9c2a88cff05 Mon Sep 17 00:00:00 2001 From: Daniel Imberman Date: Thu, 21 Jan 2021 12:57:35 -0800 Subject: [PATCH] Fix error with quick-failing tasks in KubernetesPodOperator (#13621) * Fix error with quick-failing tasks in KubernetesPodOperator Addresses an issue with the KubernetesPodOperator where tasks that die quickly are not patched with "already_checked" because they never make it to the monitoring logic. * static fix --- .../kubernetes/operators/kubernetes_pod.py | 1 + .../test_kubernetes_pod_operator.py | 25 +++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 3f42ab1063..996b7dab60 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -343,6 +343,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {status}') return result except AirflowException as ex: + self.patch_already_checked(self.pod) raise AirflowException(f'Pod Launching failed: {ex}') def handle_pod_overlap( diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 574f7ffa3a..e86030312e 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -1000,5 +1000,30 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): k.execute(context) create_mock.assert_called_once() + def test_reatttach_quick_failure(self): + client = kube_client.get_kube_client(in_cluster=False) + namespace = "default" + + name = "test" + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["exit 1"], + labels={"foo": "bar"}, + name="test", + task_id=name, + in_cluster=False, + do_xcom_push=False, + is_delete_operator_pod=False, + termination_grace_period=0, + ) + + context = create_context(k) + with self.assertRaises(AirflowException): + k.execute(context) + pod = client.read_namespaced_pod(name=k.pod.metadata.name, namespace=namespace) + self.assertEqual(pod.metadata.labels["already_checked"], "True") + # pylint: enable=unused-argument