Pass image_pull_policy in KubernetesPodOperator correctly (#13289)

* pass image_pull_policy to V1Container

image_pull_policy is not being passed into the V1Container in
KubernetesPodOperator. This commit fixes this.

* add test for image_pull_policy not set

image_pull_policy should be IfNotPresent by default if
it's not set. The test ensure the correct value is passed
to the V1Container object.

(cherry picked from commit 7a560ab6de)
This commit is contained in:
Kevin Yuen 2020-12-24 12:04:39 -05:00 коммит произвёл Kaxil Naik
Родитель 60493a764c
Коммит 95b02c4710
2 изменённых файлов: 54 добавлений и 0 удалений

Просмотреть файл

@ -431,6 +431,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
name="base",
command=self.cmds,
ports=self.ports,
image_pull_policy=self.image_pull_policy,
resources=self.k8s_resources,
volume_mounts=self.volume_mounts,
args=self.arguments,

Просмотреть файл

@ -99,6 +99,59 @@ class TestKubernetesPodOperator(unittest.TestCase):
[k8s.V1LocalObjectReference(name=fake_pull_secrets)],
)
@mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
@mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
@mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_image_pull_policy_not_set(self, mock_client, monitor_mock, start_mock):
from airflow.utils.state import State
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
cluster_context='default',
)
monitor_mock.return_value = (State.SUCCESS, None)
context = self.create_context(k)
k.execute(context=context)
self.assertEqual(
start_mock.call_args[0][0].spec.containers[0].image_pull_policy,
'IfNotPresent',
)
@mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
@mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
@mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_image_pull_policy_correctly_set(self, mock_client, monitor_mock, start_mock):
from airflow.utils.state import State
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
image_pull_policy='Always',
cluster_context='default',
)
monitor_mock.return_value = (State.SUCCESS, None)
context = self.create_context(k)
k.execute(context=context)
self.assertEqual(
start_mock.call_args[0][0].spec.containers[0].image_pull_policy,
'Always',
)
@mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
@mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
@mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.delete_pod")