[AIRFLOW-6516] BugFix: airflow.cfg does not exist in Volume Mounts (#7109)
This commit is contained in:
Родитель
a6676c4711
Коммит
a0a02dfcd8
|
@ -282,10 +282,10 @@ class WorkerConfiguration(LoggingMixin):
|
|||
|
||||
# Mount the airflow_local_settings.py file via a configmap the user has specified
|
||||
if self.kube_config.airflow_local_settings_configmap:
|
||||
config_volume_name = 'airflow-config'
|
||||
config_volume_name = 'airflow-local-settings'
|
||||
config_path = '{}/config/airflow_local_settings.py'.format(self.worker_airflow_home)
|
||||
volume_mounts[config_volume_name] = k8s.V1VolumeMount(
|
||||
name=config_volume_name,
|
||||
name='airflow-config',
|
||||
mount_path=config_path,
|
||||
sub_path='airflow_local_settings.py',
|
||||
read_only=True
|
||||
|
@ -350,16 +350,6 @@ class WorkerConfiguration(LoggingMixin):
|
|||
)
|
||||
)
|
||||
|
||||
# Mount the airflow.cfg file via a configmap the user has specified
|
||||
if self.kube_config.airflow_configmap:
|
||||
config_volume_name = 'airflow-config'
|
||||
volumes[config_volume_name] = k8s.V1Volume(
|
||||
name=config_volume_name,
|
||||
config_map=k8s.V1ConfigMapVolumeSource(
|
||||
name=self.kube_config.airflow_configmap
|
||||
)
|
||||
)
|
||||
|
||||
# Mount the airflow_local_settings.py file via a configmap the user has specified
|
||||
if self.kube_config.airflow_local_settings_configmap:
|
||||
config_volume_name = 'airflow-config'
|
||||
|
@ -370,6 +360,16 @@ class WorkerConfiguration(LoggingMixin):
|
|||
)
|
||||
)
|
||||
|
||||
# Mount the airflow.cfg file via a configmap the user has specified
|
||||
if self.kube_config.airflow_configmap:
|
||||
config_volume_name = 'airflow-config'
|
||||
volumes[config_volume_name] = k8s.V1Volume(
|
||||
name=config_volume_name,
|
||||
config_map=k8s.V1ConfigMapVolumeSource(
|
||||
name=self.kube_config.airflow_configmap
|
||||
)
|
||||
)
|
||||
|
||||
return list(volumes.values())
|
||||
|
||||
def generate_dag_volume_mount_path(self) -> str:
|
||||
|
|
|
@ -96,6 +96,9 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
|
|||
self.kube_config.kube_labels = {'dag_id': 'original_dag_id', 'my_label': 'label_id'}
|
||||
self.api_client = ApiClient()
|
||||
|
||||
def tearDown(self) -> None:
|
||||
self.kube_config = None
|
||||
|
||||
def test_worker_configuration_no_subpaths(self):
|
||||
self.kube_config.dags_volume_claim = 'airflow-dags'
|
||||
self.kube_config.dags_folder = 'dags'
|
||||
|
@ -608,6 +611,55 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
|
|||
self.assertEqual(0, len(dag_volume_mount))
|
||||
self.assertEqual(0, len(init_containers))
|
||||
|
||||
def test_set_airflow_config_configmap(self):
|
||||
"""
|
||||
Test that airflow.cfg can be set via configmap by
|
||||
checking volume & volume-mounts are set correctly.
|
||||
"""
|
||||
self.kube_config.airflow_home = '/usr/local/airflow'
|
||||
self.kube_config.airflow_configmap = 'airflow-configmap'
|
||||
self.kube_config.airflow_local_settings_configmap = None
|
||||
self.kube_config.dags_folder = '/workers/path/to/dags'
|
||||
|
||||
worker_config = WorkerConfiguration(self.kube_config)
|
||||
pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
|
||||
"test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
|
||||
|
||||
pod_spec_dict = pod.spec.to_dict()
|
||||
|
||||
airflow_config_volume = [
|
||||
volume for volume in pod_spec_dict['volumes'] if volume["name"] == 'airflow-config'
|
||||
]
|
||||
# Test that volume_name is found
|
||||
self.assertEqual(1, len(airflow_config_volume))
|
||||
|
||||
# Test that config map exists
|
||||
self.assertEqual(
|
||||
{'default_mode': None, 'items': None, 'name': 'airflow-configmap', 'optional': None},
|
||||
airflow_config_volume[0]['config_map']
|
||||
)
|
||||
|
||||
# Test that only 1 Volume Mounts exists with 'airflow-config' name
|
||||
# One for airflow.cfg
|
||||
volume_mounts = [
|
||||
volume_mount for volume_mount in pod_spec_dict['containers'][0]['volume_mounts']
|
||||
if volume_mount['name'] == 'airflow-config'
|
||||
]
|
||||
|
||||
self.assertEqual(
|
||||
[
|
||||
{
|
||||
'mount_path': '/usr/local/airflow/airflow.cfg',
|
||||
'mount_propagation': None,
|
||||
'name': 'airflow-config',
|
||||
'read_only': True,
|
||||
'sub_path': 'airflow.cfg',
|
||||
'sub_path_expr': None
|
||||
}
|
||||
],
|
||||
volume_mounts
|
||||
)
|
||||
|
||||
def test_set_airflow_local_settings_configmap(self):
|
||||
"""
|
||||
Test that airflow_local_settings.py can be set via configmap by
|
||||
|
@ -622,29 +674,50 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
|
|||
pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
|
||||
"test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
|
||||
|
||||
pod_spec_dict = pod.spec.to_dict()
|
||||
|
||||
airflow_config_volume = [
|
||||
volume for volume in pod.spec.volumes if volume.name == 'airflow-config'
|
||||
volume for volume in pod_spec_dict['volumes'] if volume["name"] == 'airflow-config'
|
||||
]
|
||||
# Test that volume_name is found
|
||||
self.assertEqual(1, len(airflow_config_volume))
|
||||
|
||||
# Test that config map exists
|
||||
self.assertEqual("airflow-configmap", airflow_config_volume[0].config_map.name)
|
||||
|
||||
# Test Volume Mount exists
|
||||
local_setting_volume_mount = [
|
||||
volume_mount for volume_mount in pod.spec.containers[0].volume_mounts
|
||||
if volume_mount.name == 'airflow-config'
|
||||
]
|
||||
self.assertEqual(1, len(local_setting_volume_mount))
|
||||
|
||||
# Test Mounth Path is set correctly.
|
||||
self.assertEqual(
|
||||
'/usr/local/airflow/config/airflow_local_settings.py',
|
||||
local_setting_volume_mount[0].mount_path
|
||||
{'default_mode': None, 'items': None, 'name': 'airflow-configmap', 'optional': None},
|
||||
airflow_config_volume[0]['config_map']
|
||||
)
|
||||
|
||||
# Test that 2 Volume Mounts exists and has 2 different mount-paths
|
||||
# One for airflow.cfg
|
||||
# Second for airflow_local_settings.py
|
||||
volume_mounts = [
|
||||
volume_mount for volume_mount in pod_spec_dict['containers'][0]['volume_mounts']
|
||||
if volume_mount['name'] == 'airflow-config'
|
||||
]
|
||||
self.assertEqual(2, len(volume_mounts))
|
||||
|
||||
self.assertEqual(
|
||||
[
|
||||
{
|
||||
'mount_path': '/usr/local/airflow/airflow.cfg',
|
||||
'mount_propagation': None,
|
||||
'name': 'airflow-config',
|
||||
'read_only': True,
|
||||
'sub_path': 'airflow.cfg',
|
||||
'sub_path_expr': None
|
||||
},
|
||||
{
|
||||
'mount_path': '/usr/local/airflow/config/airflow_local_settings.py',
|
||||
'mount_propagation': None,
|
||||
'name': 'airflow-config',
|
||||
'read_only': True,
|
||||
'sub_path': 'airflow_local_settings.py',
|
||||
'sub_path_expr': None
|
||||
}
|
||||
],
|
||||
volume_mounts
|
||||
)
|
||||
self.assertEqual(True, local_setting_volume_mount[0].read_only)
|
||||
self.assertEqual('airflow_local_settings.py', local_setting_volume_mount[0].sub_path)
|
||||
|
||||
def test_kubernetes_environment_variables(self):
|
||||
# Tests the kubernetes environment variables get copied into the worker pods
|
||||
|
|
Загрузка…
Ссылка в новой задаче