Fix error with quick-failing tasks in KubernetesPodOperator (#13621)
* Fix error with quick-failing tasks in KubernetesPodOperator Addresses an issue with the KubernetesPodOperator where tasks that die quickly are not patched with "already_checked" because they never make it to the monitoring logic. * static fix
This commit is contained in:
Родитель
10b8ecc86f
Коммит
94d3ed61d6
|
@ -343,6 +343,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
|
||||||
raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {status}')
|
raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {status}')
|
||||||
return result
|
return result
|
||||||
except AirflowException as ex:
|
except AirflowException as ex:
|
||||||
|
self.patch_already_checked(self.pod)
|
||||||
raise AirflowException(f'Pod Launching failed: {ex}')
|
raise AirflowException(f'Pod Launching failed: {ex}')
|
||||||
|
|
||||||
def handle_pod_overlap(
|
def handle_pod_overlap(
|
||||||
|
|
|
@ -1000,5 +1000,30 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
|
||||||
k.execute(context)
|
k.execute(context)
|
||||||
create_mock.assert_called_once()
|
create_mock.assert_called_once()
|
||||||
|
|
||||||
|
def test_reatttach_quick_failure(self):
|
||||||
|
client = kube_client.get_kube_client(in_cluster=False)
|
||||||
|
namespace = "default"
|
||||||
|
|
||||||
|
name = "test"
|
||||||
|
k = KubernetesPodOperator(
|
||||||
|
namespace='default',
|
||||||
|
image="ubuntu:16.04",
|
||||||
|
cmds=["bash", "-cx"],
|
||||||
|
arguments=["exit 1"],
|
||||||
|
labels={"foo": "bar"},
|
||||||
|
name="test",
|
||||||
|
task_id=name,
|
||||||
|
in_cluster=False,
|
||||||
|
do_xcom_push=False,
|
||||||
|
is_delete_operator_pod=False,
|
||||||
|
termination_grace_period=0,
|
||||||
|
)
|
||||||
|
|
||||||
|
context = create_context(k)
|
||||||
|
with self.assertRaises(AirflowException):
|
||||||
|
k.execute(context)
|
||||||
|
pod = client.read_namespaced_pod(name=k.pod.metadata.name, namespace=namespace)
|
||||||
|
self.assertEqual(pod.metadata.labels["already_checked"], "True")
|
||||||
|
|
||||||
|
|
||||||
# pylint: enable=unused-argument
|
# pylint: enable=unused-argument
|
||||||
|
|
Загрузка…
Ссылка в новой задаче